From 3379f005a585ebe52b92895ecec1d2293e80e5a0 Mon Sep 17 00:00:00 2001 From: Remco Verhoef Date: Fri, 5 May 2017 18:49:26 -0700 Subject: [PATCH] Initial implementation of Google Cloud Storage --- cmd/gateway-gcs-anonymous.go | 109 ++++++++++++++++++++++++++++++++--- cmd/gateway-gcs-layer.go | 57 +++++++++--------- 2 files changed, 130 insertions(+), 36 deletions(-) diff --git a/cmd/gateway-gcs-anonymous.go b/cmd/gateway-gcs-anonymous.go index 8868b4027..674293c05 100644 --- a/cmd/gateway-gcs-anonymous.go +++ b/cmd/gateway-gcs-anonymous.go @@ -16,24 +16,117 @@ package cmd -import "io" +import ( + "fmt" + "io" + "net/http" + "strconv" + "time" +) + +func toGCSPublicURL(bucket, object string) string { + return fmt.Sprintf("https://storage.googleapis.com/%s/%s", bucket, object) +} + +// AnonPutObject creates a new object anonymously with the incoming data, +func (l *gcsGateway) AnonPutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) { + + return ObjectInfo{}, NotImplemented{} +} // AnonGetObject - Get object anonymously -func (l *gcsGateway) AnonGetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error { - return NotImplemented{} +func (l *gcsGateway) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error { + req, err := http.NewRequest("GET", toGCSPublicURL(bucket, object), nil) + if err != nil { + return gcsToObjectError(traceError(err), bucket, object) + } + + if length > 0 && startOffset > 0 { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1)) + } else if startOffset > 0 { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-", startOffset)) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return gcsToObjectError(traceError(err), bucket, object) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK { + return gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket, object)), bucket, object) + } + + _, err = io.Copy(writer, resp.Body) + return gcsToObjectError(traceError(err), bucket, object) } // AnonGetObjectInfo - Get object info anonymously -func (l *gcsGateway) AnonGetObjectInfo(bucket string, object string) (ObjectInfo, error) { - return ObjectInfo{}, NotImplemented{} +func (l *gcsGateway) AnonGetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) { + resp, err := http.Head(toGCSPublicURL(bucket, object)) + if err != nil { + return objInfo, gcsToObjectError(traceError(err), bucket, object) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Println(resp.StatusCode) + return objInfo, gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket, object)), bucket, object) + } + + var contentLength int64 + contentLengthStr := resp.Header.Get("Content-Length") + if contentLengthStr != "" { + contentLength, err = strconv.ParseInt(contentLengthStr, 0, 64) + if err != nil { + return objInfo, gcsToObjectError(traceError(errUnexpected), bucket, object) + } + } + + t, err := time.Parse(time.RFC1123, resp.Header.Get("Last-Modified")) + if err != nil { + return objInfo, traceError(err) + } + + objInfo.ModTime = t + objInfo.Bucket = bucket + objInfo.UserDefined = make(map[string]string) + if resp.Header.Get("Content-Encoding") != "" { + objInfo.UserDefined["Content-Encoding"] = resp.Header.Get("Content-Encoding") + } + objInfo.UserDefined["Content-Type"] = resp.Header.Get("Content-Type") + objInfo.MD5Sum = resp.Header.Get("Etag") + objInfo.ModTime = t + objInfo.Name = object + objInfo.Size = contentLength + return } // AnonListObjects - List objects anonymously func (l *gcsGateway) AnonListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) { - return ListObjectsInfo{}, NotImplemented{} + result, err := l.anonClient.ListObjects(bucket, prefix, marker, delimiter, maxKeys) + if err != nil { + return ListObjectsInfo{}, s3ToObjectError(traceError(err), bucket) + } + + return fromMinioClientListBucketResult(bucket, result), nil } // AnonGetBucketInfo - Get bucket metadata anonymously. -func (l *gcsGateway) AnonGetBucketInfo(bucket string) (BucketInfo, error) { - return BucketInfo{}, NotImplemented{} +func (l *gcsGateway) AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error) { + resp, err := http.Head(toGCSPublicURL(bucket, "")) + if err != nil { + return bucketInfo, gcsToObjectError(traceError(err)) + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return bucketInfo, gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket)), bucket) + } + + // Last-Modified date being returned by GCS + return BucketInfo{ + Name: bucket, + }, nil } diff --git a/cmd/gateway-gcs-layer.go b/cmd/gateway-gcs-layer.go index 7b7e8ac68..1e8a26473 100644 --- a/cmd/gateway-gcs-layer.go +++ b/cmd/gateway-gcs-layer.go @@ -37,6 +37,17 @@ import ( "github.com/minio/minio-go/pkg/policy" ) +var ( + // ErrNotValidMultipartIdentifier the multipart identifier is not in the correct form + ErrNotValidMultipartIdentifier = errors.New("Not a valid multipart identifier") +) + +const ( + // ZZZZMinioPrefix is used for metadata and multiparts. The prefix is being filtered out, + // hence the naming of ZZZZ (last prefix) + ZZZZMinioPrefix = "ZZZZ-Minio" +) + // Convert Minio errors to minio object layer errors. func gcsToObjectError(err error, params ...string) error { if err == nil { @@ -62,8 +73,6 @@ func gcsToObjectError(err error, params ...string) error { object = params[1] } - fmt.Printf("%+v\n ok=%v code=%v, message=%v body=%v reason=%v bucket=%v object=%v\n", err.Error(), ok, "", "", "", "", bucket, object) - // in some cases just a plain error is being returned switch err.Error() { case "storage: bucket doesn't exist": @@ -81,17 +90,15 @@ func gcsToObjectError(err error, params ...string) error { return e } - googleApiErr, ok := err.(*googleapi.Error) + googleAPIErr, ok := err.(*googleapi.Error) if !ok { // We don't interpret non Minio errors. As minio errors will // have StatusCode to help to convert to object errors. return e } - fmt.Printf("%+v\n ok=%v code=%v, message=%v body=%v reason=%v bucket=%v object=%v\n", googleApiErr, ok, googleApiErr.Code, googleApiErr.Message, googleApiErr.Body, googleApiErr.Errors[0].Reason, bucket, object) - - reason := googleApiErr.Errors[0].Reason - message := googleApiErr.Errors[0].Message + reason := googleAPIErr.Errors[0].Reason + message := googleAPIErr.Errors[0].Message switch reason { case "forbidden": @@ -104,7 +111,6 @@ func gcsToObjectError(err error, params ...string) error { Bucket: bucket, } case "notFound": - fmt.Println(object, bucket) if object != "" { err = ObjectNotFound{ Bucket: bucket, @@ -141,7 +147,6 @@ func gcsToObjectError(err error, params ...string) error { // gcsGateway - Implements gateway for Minio and GCS compatible object storage servers. type gcsGateway struct { client *storage.Client - Client *minio.Core anonClient *minio.Core projectID string ctx context.Context @@ -149,8 +154,6 @@ type gcsGateway struct { // newGCSGateway returns gcs gatewaylayer func newGCSGateway(endpoint string, projectID, secretKey string, secure bool) (GatewayLayer, error) { - fmt.Println(secretKey) - ctx := context.Background() // Creates a client. @@ -159,12 +162,16 @@ func newGCSGateway(endpoint string, projectID, secretKey string, secure bool) (G return nil, err } + anonClient, err := minio.NewCore("storage.googleapis.com", "", "", secure) + if err != nil { + return nil, err + } + return &gcsGateway{ client: client, projectID: "minio-166400", ctx: ctx, - Client: nil, - anonClient: nil, + anonClient: anonClient, }, nil } @@ -248,12 +255,6 @@ func (l *gcsGateway) DeleteBucket(bucket string) error { return nil } -const ( - // ZZZZ_MINIO is used for metadata and multiparts. The prefix is being filtered out, - // hence the naming of ZZZZ (last prefix) - ZZZZ_MINIO_PREFIX = "ZZZZ-Minio" -) - // ListObjects - lists all blobs in GCS bucket filtered by prefix func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) { it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) @@ -274,7 +275,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de attrs, _ := it.Next() if attrs == nil { - } else if attrs.Prefix == ZZZZ_MINIO_PREFIX { + } else if attrs.Prefix == ZZZZMinioPrefix { break } @@ -292,7 +293,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix) } - if attrs.Prefix == ZZZZ_MINIO_PREFIX { + if attrs.Prefix == ZZZZMinioPrefix { // we don't return our metadata prefix continue } else if attrs.Prefix != "" { @@ -405,7 +406,7 @@ func fromGCSObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo { } // GetObjectInfo - reads object info and replies back ObjectInfo -func (l *gcsGateway) GetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) { +func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, error) { // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { @@ -500,7 +501,7 @@ func (l *gcsGateway) DeleteObject(bucket string, object string) error { // ListMultipartUploads - lists all multipart uploads. func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) { // TODO: implement prefix and prefixes, how does this work for Multiparts?? - prefix = fmt.Sprintf("%s/multipart-", ZZZZ_MINIO_PREFIX) + prefix = fmt.Sprintf("%s/multipart-", ZZZZMinioPrefix) it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) @@ -561,11 +562,11 @@ func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarke func fromGCSMultipartKey(s string) (key, uploadID string, partID int, err error) { parts := strings.Split(s, "-") if parts[0] != "multipart" { - return "", "", 0, errors.New("Not a valid multipart identifier.") + return "", "", 0, ErrNotValidMultipartIdentifier } if len(parts) != 4 { - return "", "", 0, errors.New("Not a valid multipart identifier.") + return "", "", 0, ErrNotValidMultipartIdentifier } key = parts[1] @@ -584,7 +585,7 @@ func toGCSMultipartKey(key string, uploadID string, partID int) string { // explicitly notes that uploaded parts with same number are being overwritten // parts are allowed to be numbered from 1 to 10,000 (inclusive) - return fmt.Sprintf("%s/multipart-%s-%s-%05d", ZZZZ_MINIO_PREFIX, key, uploadID, partID) + return fmt.Sprintf("%s/multipart-%s-%s-%05d", ZZZZMinioPrefix, key, uploadID, partID) } // NewMultipartUpload - upload object in multiple parts @@ -629,7 +630,7 @@ func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, p func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) { // TODO: support partNumberMarker - prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZ_MINIO_PREFIX, key, uploadID) + prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZMinioPrefix, key, uploadID) delimiter := "/" it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) @@ -682,7 +683,7 @@ func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, // AbortMultipartUpload aborts a ongoing multipart upload func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error { - prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZ_MINIO_PREFIX, key, uploadID) + prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZMinioPrefix, key, uploadID) delimiter := "/" // delete part zero, ignoring errors here, we want to clean up all remains