From 3ff8a1b719e8c2d09af4290192b0aafd523b3e1f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 27 Feb 2016 03:04:52 -0800 Subject: [PATCH] api: Implement CopyObject s3 API, doing server side copy. Fixes #1172 --- api-errors.go | 18 +++++ api-headers.go | 2 +- api-response.go | 19 ++++- generic-handlers.go | 5 -- object-handlers.go | 138 +++++++++++++++++++++++++++++++- pkg/fs/fs-bucket-listobjects.go | 8 +- pkg/fs/fs-datatypes.go | 10 +-- pkg/fs/fs-multipart.go | 12 +-- pkg/fs/fs-object.go | 24 +++--- routers.go | 30 ++++++- server_fs_test.go | 48 ++++++++--- 11 files changed, 263 insertions(+), 51 deletions(-) diff --git a/api-errors.go b/api-errors.go index 8dc6e441c..787e67d3d 100644 --- a/api-errors.go +++ b/api-errors.go @@ -57,6 +57,9 @@ const ( InvalidMaxUploads InvalidMaxParts InvalidPartNumberMarker + InvalidRequestBody + InvalidCopySource + InvalidCopyDest MalformedXML MissingContentLength MissingRequestBodyError @@ -80,6 +83,21 @@ const ( // APIError code to Error structure map var errorCodeResponse = map[int]APIError{ + InvalidCopyDest: { + Code: "InvalidRequest", + Description: "This copy request is illegal because it is trying to copy an object to itself.", + HTTPStatusCode: http.StatusBadRequest, + }, + InvalidCopySource: { + Code: "InvalidArgument", + Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.", + HTTPStatusCode: http.StatusBadRequest, + }, + InvalidRequestBody: { + Code: "InvalidArgument", + Description: "Body shouldn't be set for this request.", + HTTPStatusCode: http.StatusBadRequest, + }, InvalidMaxUploads: { Code: "InvalidArgument", Description: "Argument maxUploads must be an integer between 0 and 2147483647.", diff --git a/api-headers.go b/api-headers.go index 8099d32de..9dd8986ca 100644 --- a/api-headers.go +++ b/api-headers.go @@ -76,7 +76,7 @@ func setObjectHeaders(w http.ResponseWriter, metadata fs.ObjectMetadata, content setCommonHeaders(w) } // set object headers - lastModified := metadata.Created.Format(http.TimeFormat) + lastModified := metadata.LastModified.Format(http.TimeFormat) // object related headers w.Header().Set("Content-Type", metadata.ContentType) if metadata.MD5 != "" { diff --git a/api-response.go b/api-response.go index c3cebc2a6..051912de4 100644 --- a/api-response.go +++ b/api-response.go @@ -19,6 +19,7 @@ package main import ( "encoding/xml" "net/http" + "time" "github.com/minio/minio/pkg/fs" ) @@ -181,6 +182,14 @@ type Object struct { StorageClass string } +// CopyObjectResponse container returns ETag and LastModified of the +// successfully copied object +type CopyObjectResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CopyObjectResult" json:"-"` + ETag string + LastModified string // time string of format "2006-01-02T15:04:05.000Z" +} + // Initiator inherit from Owner struct, fields are same type Initiator Owner @@ -289,7 +298,7 @@ func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKe continue } content.Key = object.Object - content.LastModified = object.Created.Format(timeFormatAMZ) + content.LastModified = object.LastModified.Format(timeFormatAMZ) if object.MD5 != "" { content.ETag = "\"" + object.MD5 + "\"" } @@ -318,6 +327,14 @@ func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKe return data } +// generateCopyObjectResponse +func generateCopyObjectResponse(etag string, lastModified time.Time) CopyObjectResponse { + return CopyObjectResponse{ + ETag: "\"" + etag + "\"", + LastModified: lastModified.Format(timeFormatAMZ), + } +} + // generateInitiateMultipartUploadResponse func generateInitiateMultipartUploadResponse(bucket, key, uploadID string) InitiateMultipartUploadResponse { return InitiateMultipartUploadResponse{ diff --git a/generic-handlers.go b/generic-handlers.go index f91e9f905..b063187b2 100644 --- a/generic-handlers.go +++ b/generic-handlers.go @@ -251,11 +251,6 @@ func (h resourceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeErrorResponse(w, r, NotImplemented, r.URL.Path) return } - // X-Amz-Copy-Source should be ignored as NotImplemented. - if _, ok := r.Header[http.CanonicalHeaderKey("x-amz-copy-source")]; ok { - writeErrorResponse(w, r, NotImplemented, r.URL.Path) - return - } h.handler.ServeHTTP(w, r) } diff --git a/object-handlers.go b/object-handlers.go index d78420e13..5a4a37d2d 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -17,12 +17,15 @@ package main import ( + "io" "net/http" "net/url" "strconv" + "strings" - "github.com/gorilla/mux" + mux "github.com/gorilla/mux" "github.com/minio/minio/pkg/fs" + "github.com/minio/minio/pkg/probe" ) const ( @@ -147,14 +150,141 @@ func (api storageAPI) HeadObjectHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusOK) } +// CopyObjectHandler - Copy Object +// ---------- +// This implementation of the PUT operation adds an object to a bucket +// while reading the object from another source. +func (api storageAPI) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + if isRequestRequiresACLCheck(r) { + writeErrorResponse(w, r, AccessDenied, r.URL.Path) + return + } + + if !isSignV4ReqAuthenticated(api.Signature, r) { + writeErrorResponse(w, r, SignatureDoesNotMatch, r.URL.Path) + return + } + + // TODO: Reject requests where body/payload is present, for now we + // don't even read it. + + // objectSource + objectSource := r.Header.Get("X-Amz-Copy-Source") + + // Skip the first element if it is '/', split the rest. + if strings.HasPrefix(objectSource, "/") { + objectSource = objectSource[1:] + } + splits := strings.SplitN(objectSource, "/", 2) + + // Save sourceBucket and sourceObject extracted from url Path. + var sourceBucket, sourceObject string + if len(splits) == 2 { + sourceBucket = splits[0] + sourceObject = splits[1] + } + // If source object is empty, reply back error. + if sourceObject == "" { + writeErrorResponse(w, r, InvalidCopySource, r.URL.Path) + return + } + + // Source and destination objects cannot be same, reply back error. + if sourceObject == object && sourceBucket == bucket { + writeErrorResponse(w, r, InvalidCopyDest, r.URL.Path) + return + } + + metadata, err := api.Filesystem.GetObjectMetadata(sourceBucket, sourceObject) + if err != nil { + errorIf(err.Trace(), "GetObjectMetadata failed.", nil) + switch err.ToGoError().(type) { + case fs.BucketNameInvalid: + writeErrorResponse(w, r, InvalidBucketName, objectSource) + case fs.BucketNotFound: + writeErrorResponse(w, r, NoSuchBucket, objectSource) + case fs.ObjectNotFound: + writeErrorResponse(w, r, NoSuchKey, objectSource) + case fs.ObjectNameInvalid: + writeErrorResponse(w, r, NoSuchKey, objectSource) + default: + writeErrorResponse(w, r, InternalError, objectSource) + } + return + } + + /// maximum Upload size for object in a single CopyObject operation. + if isMaxObjectSize(metadata.Size) { + writeErrorResponse(w, r, EntityTooLarge, objectSource) + return + } + + // Initialize a pipe for data pipe line. + reader, writer := io.Pipe() + + // Start writing in a routine. + go func() { + defer writer.Close() + if _, getErr := api.Filesystem.GetObject(writer, sourceBucket, sourceObject, 0, 0); getErr != nil { + writer.CloseWithError(probe.WrapError(getErr)) + return + } + }() + + // Verify md5sum. + expectedMD5Sum := metadata.MD5 + // Size of object. + size := metadata.Size + + // Create the object. + metadata, err = api.Filesystem.CreateObject(bucket, object, expectedMD5Sum, size, reader, nil) + if err != nil { + errorIf(err.Trace(), "CreateObject failed.", nil) + switch err.ToGoError().(type) { + case fs.RootPathFull: + writeErrorResponse(w, r, RootPathFull, r.URL.Path) + case fs.BucketNotFound: + writeErrorResponse(w, r, NoSuchBucket, r.URL.Path) + case fs.BucketNameInvalid: + writeErrorResponse(w, r, InvalidBucketName, r.URL.Path) + case fs.BadDigest: + writeErrorResponse(w, r, BadDigest, r.URL.Path) + case fs.IncompleteBody: + writeErrorResponse(w, r, IncompleteBody, r.URL.Path) + case fs.InvalidDigest: + writeErrorResponse(w, r, InvalidDigest, r.URL.Path) + case fs.ObjectExistsAsPrefix: + writeErrorResponse(w, r, ObjectExistsAsPrefix, r.URL.Path) + default: + writeErrorResponse(w, r, InternalError, r.URL.Path) + } + return + } + response := generateCopyObjectResponse(metadata.MD5, metadata.LastModified) + encodedSuccessResponse := encodeSuccessResponse(response) + // write headers + setCommonHeaders(w) + // write success response. + writeSuccessResponse(w, encodedSuccessResponse) +} + // PutObjectHandler - PUT Object // ---------- // This implementation of the PUT operation adds an object to a bucket. func (api storageAPI) PutObjectHandler(w http.ResponseWriter, r *http.Request) { - var object, bucket string + // If the matching failed, it means that the X-Amz-Copy-Source was + // wrong, fail right here. + if _, ok := r.Header["X-Amz-Copy-Source"]; ok { + writeErrorResponse(w, r, InvalidCopySource, r.URL.Path) + return + } vars := mux.Vars(r) - bucket = vars["bucket"] - object = vars["object"] + bucket := vars["bucket"] + object := vars["object"] if isRequestRequiresACLCheck(r) { if api.Filesystem.IsPrivateBucket(bucket) || api.Filesystem.IsReadOnlyBucket(bucket) { diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 73561bcde..7cb3ce0ca 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -103,10 +103,10 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe objectName := strings.TrimPrefix(path, bucketPathPrefix) if strings.HasPrefix(objectName, prefix) { object := ObjectMetadata{ - Object: objectName, - Created: info.ModTime(), - Mode: info.Mode(), - Size: info.Size(), + Object: objectName, + LastModified: info.ModTime(), + Mode: info.Mode(), + Size: info.Size(), } select { // Send object on walker channel. diff --git a/pkg/fs/fs-datatypes.go b/pkg/fs/fs-datatypes.go index c26e0bc71..3167a7fd3 100644 --- a/pkg/fs/fs-datatypes.go +++ b/pkg/fs/fs-datatypes.go @@ -33,11 +33,11 @@ type ObjectMetadata struct { Bucket string Object string - ContentType string - Created time.Time - Mode os.FileMode - MD5 string - Size int64 + ContentType string + LastModified time.Time + Mode os.FileMode + MD5 string + Size int64 } // PartMetadata - various types of individual part resources diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 7385a041d..325d23a54 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -546,12 +546,12 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da } } newObject := ObjectMetadata{ - Bucket: bucket, - Object: object, - Created: st.ModTime(), - Size: st.Size(), - ContentType: contentType, - MD5: hex.EncodeToString(md5Hasher.Sum(nil)), + Bucket: bucket, + Object: object, + LastModified: st.ModTime(), + Size: st.Size(), + ContentType: contentType, + MD5: hex.EncodeToString(md5Hasher.Sum(nil)), } return newObject, nil } diff --git a/pkg/fs/fs-object.go b/pkg/fs/fs-object.go index 0bbd8c498..2f06d4497 100644 --- a/pkg/fs/fs-object.go +++ b/pkg/fs/fs-object.go @@ -165,12 +165,12 @@ func getMetadata(rootPath, bucket, object string) (ObjectMetadata, *probe.Error) } } metadata := ObjectMetadata{ - Bucket: bucket, - Object: object, - Created: stat.ModTime(), - Size: stat.Size(), - ContentType: contentType, - Mode: stat.Mode(), + Bucket: bucket, + Object: object, + LastModified: stat.ModTime(), + Size: stat.Size(), + ContentType: contentType, + Mode: stat.Mode(), } return metadata, nil } @@ -312,12 +312,12 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in } } newObject := ObjectMetadata{ - Bucket: bucket, - Object: object, - Created: st.ModTime(), - Size: st.Size(), - ContentType: contentType, - MD5: md5Sum, + Bucket: bucket, + Object: object, + LastModified: st.ModTime(), + Size: st.Size(), + ContentType: contentType, + MD5: md5Sum, } return newObject, nil } diff --git a/routers.go b/routers.go index a7a3b3064..278e55696 100644 --- a/routers.go +++ b/routers.go @@ -105,29 +105,53 @@ func registerAPIHandlers(mux *router.Router, a storageAPI, w *webAPI) { // Bucket router bucket := api.PathPrefix("/{bucket}").Subrouter() - // Object operations + /// Object operations + + // HeadObject bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(a.HeadObjectHandler) + // PutObjectPart bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(a.PutObjectPartHandler).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}") + // ListObjectPxarts bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(a.ListObjectPartsHandler).Queries("uploadId", "{uploadId:.*}") + // CompleteMultipartUpload bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(a.CompleteMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}") + // NewMultipartUpload bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(a.NewMultipartUploadHandler).Queries("uploads", "") + // AbortMultipartUpload bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(a.AbortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}") + // GetObject bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(a.GetObjectHandler) + // CopyObject + bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/).*?").HandlerFunc(a.CopyObjectHandler) + // PutObject bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(a.PutObjectHandler) + // DeleteObject bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(a.DeleteObjectHandler) - // Bucket operations + /// Bucket operations + + // GetBucketLocation bucket.Methods("GET").HandlerFunc(a.GetBucketLocationHandler).Queries("location", "") + // GetBucketACL bucket.Methods("GET").HandlerFunc(a.GetBucketACLHandler).Queries("acl", "") + // ListMultipartUploads bucket.Methods("GET").HandlerFunc(a.ListMultipartUploadsHandler).Queries("uploads", "") + // ListObjects bucket.Methods("GET").HandlerFunc(a.ListObjectsHandler) + // PutBucketACL bucket.Methods("PUT").HandlerFunc(a.PutBucketACLHandler).Queries("acl", "") + // PutBucket bucket.Methods("PUT").HandlerFunc(a.PutBucketHandler) + // HeadBucket bucket.Methods("HEAD").HandlerFunc(a.HeadBucketHandler) + // PostPolicy bucket.Methods("POST").HandlerFunc(a.PostPolicyBucketHandler) + // DeleteBucket bucket.Methods("DELETE").HandlerFunc(a.DeleteBucketHandler) - // Root operation + /// Root operation + + // ListBuckets api.Methods("GET").HandlerFunc(a.ListBucketsHandler) } diff --git a/server_fs_test.go b/server_fs_test.go index b9e6ddbd5..c6e7b8bdd 100644 --- a/server_fs_test.go +++ b/server_fs_test.go @@ -508,15 +508,6 @@ func (s *MyAPIFSCacheSuite) TestNotImplemented(c *C) { response, err := client.Do(request) c.Assert(err, IsNil) c.Assert(response.StatusCode, Equals, http.StatusNotImplemented) - - request, err = s.newRequest("POST", testAPIFSCacheServer.URL+"/bucket/object", 0, nil) - request.Header.Set("X-Amz-Copy-Source", "/bucket/object-old") - c.Assert(err, IsNil) - - client = http.Client{} - response, err = client.Do(request) - c.Assert(err, IsNil) - c.Assert(response.StatusCode, Equals, http.StatusNotImplemented) } func (s *MyAPIFSCacheSuite) TestHeader(c *C) { @@ -550,6 +541,44 @@ func (s *MyAPIFSCacheSuite) TestPutBucket(c *C) { c.Assert(response.StatusCode, Equals, http.StatusOK) } +func (s *MyAPIFSCacheSuite) TestCopyObject(c *C) { + request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object-copy", 0, nil) + c.Assert(err, IsNil) + request.Header.Add("x-amz-acl", "private") + + client := http.Client{} + response, err := client.Do(request) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + + buffer1 := bytes.NewReader([]byte("hello world")) + request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object-copy/object", int64(buffer1.Len()), buffer1) + c.Assert(err, IsNil) + + response, err = client.Do(request) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + + request, err = s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object-copy/object1", 0, nil) + request.Header.Set("X-Amz-Copy-Source", "/put-object-copy/object") + c.Assert(err, IsNil) + + response, err = client.Do(request) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + + request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/put-object-copy/object1", 0, nil) + c.Assert(err, IsNil) + + response, err = client.Do(request) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + object, err := ioutil.ReadAll(response.Body) + c.Assert(err, IsNil) + + c.Assert(string(object), Equals, "hello world") +} + func (s *MyAPIFSCacheSuite) TestPutObject(c *C) { request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/put-object", 0, nil) c.Assert(err, IsNil) @@ -759,7 +788,6 @@ func (s *MyAPIFSCacheSuite) TestPartialContent(c *C) { // prepare request request, err = s.newRequest("GET", testAPIFSCacheServer.URL+"/partial-content/bar", 0, nil) c.Assert(err, IsNil) - request.Header.Add("Accept", "application/json") request.Header.Add("Range", "bytes=6-7") client = http.Client{}