diff --git a/cmd/api-errors.go b/cmd/api-errors.go index d22f05040..a3958d250 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -62,6 +62,7 @@ const ( ErrInvalidPartNumberMarker ErrInvalidRequestBody ErrInvalidCopySource + ErrInvalidMetadataDirective ErrInvalidCopyDest ErrInvalidPolicyDocument ErrInvalidObjectState @@ -145,7 +146,7 @@ const ( var errorCodeResponse = map[APIErrorCode]APIError{ ErrInvalidCopyDest: { Code: "InvalidRequest", - Description: "This copy request is illegal because it is trying to copy an object to itself.", + Description: "This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.", HTTPStatusCode: http.StatusBadRequest, }, ErrInvalidCopySource: { @@ -153,6 +154,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidMetadataDirective: { + Code: "InvalidArgument", + Description: "Unknown metadata directive.", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidRequestBody: { Code: "InvalidArgument", Description: "Body shouldn't be set for this request.", diff --git a/cmd/fs-createfile.go b/cmd/fs-createfile.go deleted file mode 100644 index db0afd017..000000000 --- a/cmd/fs-createfile.go +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cmd - -import "io" - -func fsCreateFile(disk StorageAPI, reader io.Reader, buf []byte, tmpBucket, tempObj string) (int64, error) { - bytesWritten := int64(0) - // Read the buffer till io.EOF and append the read data to the temporary file. - for { - n, rErr := reader.Read(buf) - if rErr != nil && rErr != io.EOF { - return 0, traceError(rErr) - } - bytesWritten += int64(n) - wErr := disk.AppendFile(tmpBucket, tempObj, buf[0:n]) - if wErr != nil { - return 0, traceError(wErr) - } - if rErr == io.EOF { - break - } - } - return bytesWritten, nil -} diff --git a/cmd/fs-helpers.go b/cmd/fs-helpers.go new file mode 100644 index 000000000..0c457fad2 --- /dev/null +++ b/cmd/fs-helpers.go @@ -0,0 +1,91 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "io" + +// Reads from the requested local location uses a staging buffer. Restricts +// reads upto requested range of length and offset. If successful staging +// buffer is written to the incoming stream. Returns errors if any. +func fsReadFile(disk StorageAPI, bucket, object string, writer io.Writer, totalLeft, startOffset int64, buf []byte) (err error) { + bufSize := int64(len(buf)) + // Start the read loop until requested range. + for { + // Figure out the right size for the buffer. + curLeft := bufSize + if totalLeft < bufSize { + curLeft = totalLeft + } + // Reads the file at offset. + nr, er := disk.ReadFile(bucket, object, startOffset, buf[:curLeft]) + if nr > 0 { + // Write to response writer. + nw, ew := writer.Write(buf[0:nr]) + if nw > 0 { + // Decrement whats left to write. + totalLeft -= int64(nw) + + // Progress the offset + startOffset += int64(nw) + } + if ew != nil { + err = traceError(ew) + break + } + if nr != int64(nw) { + err = traceError(io.ErrShortWrite) + break + } + } + if er == io.EOF || er == io.ErrUnexpectedEOF { + break + } + if er != nil { + err = traceError(er) + break + } + if totalLeft == 0 { + break + } + } + return err +} + +// Reads from input stream until end of file, takes an input buffer for staging reads. +// The staging buffer is then written to the disk. Returns for any error that occurs +// while reading the stream or writing to disk. Caller should cleanup partial files. +// Upon errors total data written will be 0 and returns error, on success returns +// total data written to disk. +func fsCreateFile(disk StorageAPI, reader io.Reader, buf []byte, bucket, object string) (int64, error) { + bytesWritten := int64(0) + // Read the buffer till io.EOF and appends data to path at bucket/object. + for { + n, rErr := reader.Read(buf) + if rErr != nil && rErr != io.EOF { + return 0, traceError(rErr) + } + bytesWritten += int64(n) + wErr := disk.AppendFile(bucket, object, buf[0:n]) + if wErr != nil { + return 0, traceError(wErr) + } + if rErr == io.EOF { + break + } + } + return bytesWritten, nil +} diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 1efe7d059..f0e77f6eb 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -217,7 +217,65 @@ func (fs fsObjects) DeleteBucket(bucket string) error { /// Object Operations -// GetObject - get an object. +// CopyObject - copy object source object to destination object. +// if source object and destination object are same we only +// update metadata. +func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (ObjectInfo, error) { + // Stat the file to get file size. + fi, err := fs.storage.StatFile(srcBucket, srcObject) + if err != nil { + return ObjectInfo{}, toObjectErr(traceError(err), srcBucket, srcObject) + } + + // Check if this request is only metadata update. + cpMetadataOnly := strings.EqualFold(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) + if cpMetadataOnly { + // Save objects' metadata in `fs.json`. + fsMeta := newFSMetaV1() + fsMeta.Meta = metadata + + fsMetaPath := pathJoin(bucketMetaPrefix, dstBucket, dstObject, fsMetaJSONFile) + if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { + return ObjectInfo{}, toObjectErr(err, dstBucket, dstObject) + } + + // Get object info. + return fs.getObjectInfo(dstBucket, dstObject) + } + + // Length of the file to read. + length := fi.Size + + // Initialize pipe. + pipeReader, pipeWriter := io.Pipe() + + go func() { + startOffset := int64(0) // Read the whole file. + if gerr := fs.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { + errorIf(gerr, "Unable to read %s/%s.", srcBucket, srcObject) + pipeWriter.CloseWithError(gerr) + return + } + pipeWriter.Close() // Close writer explicitly signalling we wrote all data. + }() + + objInfo, err := fs.PutObject(dstBucket, dstObject, length, pipeReader, metadata, "") + if err != nil { + return ObjectInfo{}, toObjectErr(err, dstBucket, dstObject) + } + + // Explicitly close the reader. + pipeReader.Close() + + return objInfo, nil +} + +// GetObject - reads an object from the disk. +// Supports additional parameters like offset and length +// which are synonymous with HTTP Range requests. +// +// startOffset indicates the starting read location of the object. +// length indicates the total length of the object. func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) { if err = checkGetObjArgs(bucket, object); err != nil { return err @@ -254,50 +312,14 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, } // Allocate a staging buffer. buf := make([]byte, int(bufSize)) - for { - // Figure out the right size for the buffer. - curLeft := bufSize - if totalLeft < bufSize { - curLeft = totalLeft - } - // Reads the file at offset. - nr, er := fs.storage.ReadFile(bucket, object, offset, buf[:curLeft]) - if nr > 0 { - // Write to response writer. - nw, ew := writer.Write(buf[0:nr]) - if nw > 0 { - // Decrement whats left to write. - totalLeft -= int64(nw) - - // Progress the offset - offset += int64(nw) - } - if ew != nil { - err = traceError(ew) - break - } - if nr != int64(nw) { - err = traceError(io.ErrShortWrite) - break - } - } - if er == io.EOF || er == io.ErrUnexpectedEOF { - break - } - if er != nil { - err = traceError(er) - break - } - if totalLeft == 0 { - break - } + if err = fsReadFile(fs.storage, bucket, object, writer, totalLeft, offset, buf); err != nil { + // Returns any error. + return toObjectErr(err, bucket, object) } - - // Returns any error. - return toObjectErr(err, bucket, object) + return nil } -// getObjectInfo - get object info. +// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (fs fsObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) { fi, err := fs.storage.StatFile(bucket, object) if err != nil { @@ -342,7 +364,7 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (ObjectInfo, error) { return objInfo, nil } -// GetObjectInfo - get object info. +// GetObjectInfo - reads object metadata and replies back ObjectInfo. func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { if err := checkGetObjArgs(bucket, object); err != nil { return ObjectInfo{}, err @@ -350,7 +372,10 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { return fs.getObjectInfo(bucket, object) } -// PutObject - create an object. +// PutObject - creates an object upon reading from the input stream +// until EOF, writes data directly to configured filesystem path. +// Additionally writes `fs.json` which carries the necessary metadata +// for future object operations. func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInfo ObjectInfo, err error) { if err = checkPutObjectArgs(bucket, object, fs); err != nil { return ObjectInfo{}, err diff --git a/cmd/handler-utils.go b/cmd/handler-utils.go index 0e2c7dcf7..23d227140 100644 --- a/cmd/handler-utils.go +++ b/cmd/handler-utils.go @@ -65,6 +65,45 @@ var supportedHeaders = []string{ // Add more supported headers here. } +// isMetadataDirectiveValid - check if metadata-directive is valid. +func isMetadataDirectiveValid(h http.Header) bool { + _, ok := h[http.CanonicalHeaderKey("X-Amz-Metadata-Directive")] + if ok { + // Check atleast set metadata-directive is valid. + return (isMetadataCopy(h) || isMetadataReplace(h)) + } + // By default if x-amz-metadata-directive is not we + // treat it as 'COPY' this function returns true. + return true +} + +// Check if the metadata COPY is requested. +func isMetadataCopy(h http.Header) bool { + return h.Get("X-Amz-Metadata-Directive") == "COPY" +} + +// Check if the metadata REPLACE is requested. +func isMetadataReplace(h http.Header) bool { + return h.Get("X-Amz-Metadata-Directive") == "REPLACE" +} + +// Splits an incoming path into bucket and object components. +func path2BucketAndObject(path string) (bucket, object string) { + // Skip the first element if it is '/', split the rest. + path = strings.TrimPrefix(path, "/") + pathComponents := strings.SplitN(path, "/", 2) + + // Save the bucket and object extracted from path. + switch len(pathComponents) { + case 1: + bucket = pathComponents[0] + case 2: + bucket = pathComponents[0] + object = pathComponents[1] + } + return bucket, object +} + // extractMetadataFromHeader extracts metadata from HTTP header. func extractMetadataFromHeader(header http.Header) map[string]string { metadata := make(map[string]string) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 2c3b1ed7a..04a88061b 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -34,7 +34,8 @@ type ObjectLayer interface { // Object operations. GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) - PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInto ObjectInfo, err error) + PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInfo ObjectInfo, err error) + CopyObject(srcBucket, srcObject, destBucket, destObject string, metadata map[string]string) (objInfo ObjectInfo, err error) DeleteObject(bucket, object string) error // Multipart operations. diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index f5b4857d3..87382c8a1 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -19,7 +19,6 @@ package cmd import ( "encoding/hex" "encoding/xml" - "io" "io/ioutil" "net/http" "net/url" @@ -228,14 +227,32 @@ func (api objectAPIHandlers) HeadObjectHandler(w http.ResponseWriter, r *http.Re w.WriteHeader(http.StatusOK) } +// Extract metadata relevant for an CopyObject operation based on conditional +// header values specified in X-Amz-Metadata-Directive. +func getCpObjMetadataFromHeader(header http.Header, defaultMeta map[string]string) map[string]string { + // if x-amz-metadata-directive says REPLACE then + // we extract metadata from the input headers. + if isMetadataReplace(header) { + return extractMetadataFromHeader(header) + } + // if x-amz-metadata-directive says COPY then we + // return the default metadata. + if isMetadataCopy(header) { + return defaultMeta + } + // Copy is default behavior if not x-amz-metadata-directive is set. + return defaultMeta +} + // CopyObjectHandler - Copy Object // ---------- // This implementation of the PUT operation adds an object to a bucket // while reading the object from another source. func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - bucket := vars["bucket"] - object := vars["object"] + dstBucket := vars["bucket"] + dstObject := vars["object"] + cpDestPath := "/" + path.Join(dstBucket, dstObject) objectAPI := api.ObjectAPI() if objectAPI == nil { @@ -243,51 +260,58 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re return } - if s3Error := checkRequestAuthType(r, bucket, "s3:PutObject", serverConfig.GetRegion()); s3Error != ErrNone { + if s3Error := checkRequestAuthType(r, dstBucket, "s3:PutObject", serverConfig.GetRegion()); s3Error != ErrNone { writeErrorResponse(w, r, s3Error, r.URL.Path) return } // TODO: Reject requests where body/payload is present, for now we don't even read it. - // objectSource - objectSource, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) + // Copy source path. + cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) if err != nil { // Save unescaped string as is. - objectSource = r.Header.Get("X-Amz-Copy-Source") + cpSrcPath = r.Header.Get("X-Amz-Copy-Source") } - // Skip the first element if it is '/', split the rest. - objectSource = strings.TrimPrefix(objectSource, "/") - splits := strings.SplitN(objectSource, "/", 2) - - // Save sourceBucket and sourceObject extracted from url Path. - var sourceBucket, sourceObject string - if len(splits) == 2 { - sourceBucket = splits[0] - sourceObject = splits[1] - } - // If source object is empty, reply back error. - if sourceObject == "" { + srcBucket, srcObject := path2BucketAndObject(cpSrcPath) + // If source object is empty or bucket is empty, reply back invalid copy source. + if srcObject == "" || srcBucket == "" { writeErrorResponse(w, r, ErrInvalidCopySource, r.URL.Path) return } - // Source and destination objects cannot be same, reply back error. - if sourceObject == object && sourceBucket == bucket { - writeErrorResponse(w, r, ErrInvalidCopyDest, r.URL.Path) + // Check if metadata directive is valid. + if !isMetadataDirectiveValid(r.Header) { + writeErrorResponse(w, r, ErrInvalidMetadataDirective, r.URL.Path) return } - // Lock the object before reading. - objectRLock := globalNSMutex.NewNSLock(sourceBucket, sourceObject) - objectRLock.RLock() - defer objectRLock.RUnlock() + cpSrcDstSame := cpSrcPath == cpDestPath + // Hold write lock on destination since in both cases + // - if source and destination are same + // - if source and destination are different + // it is the sole mutating state. + objectDWLock := globalNSMutex.NewNSLock(dstBucket, dstObject) + objectDWLock.Lock() + defer objectDWLock.Unlock() + + // if source and destination are different, we have to hold + // additional read lock as well to protect against writes on + // source. + if !cpSrcDstSame { + // Hold read locks on source object only if we are + // going to read data from source object. + objectSRLock := globalNSMutex.NewNSLock(srcBucket, srcObject) + objectSRLock.RLock() + defer objectSRLock.RUnlock() + + } - objInfo, err := objectAPI.GetObjectInfo(sourceBucket, sourceObject) + objInfo, err := objectAPI.GetObjectInfo(srcBucket, srcObject) if err != nil { errorIf(err, "Unable to fetch object info.") - writeErrorResponse(w, r, toAPIErrorCode(err), objectSource) + writeErrorResponse(w, r, toAPIErrorCode(err), cpSrcPath) return } @@ -298,50 +322,34 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re /// maximum Upload size for object in a single CopyObject operation. if isMaxObjectSize(objInfo.Size) { - writeErrorResponse(w, r, ErrEntityTooLarge, objectSource) + writeErrorResponse(w, r, ErrEntityTooLarge, cpSrcPath) return } - // Size of object. - size := objInfo.Size + defaultMeta := objInfo.UserDefined - pipeReader, pipeWriter := io.Pipe() - go func() { - startOffset := int64(0) // Read the whole file. - // Get the object. - gErr := objectAPI.GetObject(sourceBucket, sourceObject, startOffset, size, pipeWriter) - if gErr != nil { - errorIf(gErr, "Unable to read an object.") - pipeWriter.CloseWithError(gErr) - return - } - pipeWriter.Close() // Close. - }() - - // Save other metadata if available. - metadata := objInfo.UserDefined - - // Remove the etag from source metadata because if it was uploaded as a multipart object - // then its ETag will not be MD5sum of the object. - delete(metadata, "md5Sum") - - sha256sum := "" + // Make sure to remove saved md5sum, object might have been uploaded + // as multipart which doesn't have a standard md5sum, we just let + // CopyObject calculate a new one. + delete(defaultMeta, "md5Sum") - objectWLock := globalNSMutex.NewNSLock(bucket, object) - objectWLock.Lock() - defer objectWLock.Unlock() + newMetadata := getCpObjMetadataFromHeader(r.Header, defaultMeta) + // Check if x-amz-metadata-directive was not set to REPLACE and source, + // desination are same objects. + if !isMetadataReplace(r.Header) && cpSrcDstSame { + // If x-amz-metadata-directive is not set to REPLACE then we need + // to error out if source and destination are same. + writeErrorResponse(w, r, ErrInvalidCopyDest, r.URL.Path) + return + } - // Create the object. - objInfo, err = objectAPI.PutObject(bucket, object, size, pipeReader, metadata, sha256sum) + // Copy source object to destination, if source and destination + // object is same then only metadata is updated. + objInfo, err = objectAPI.CopyObject(srcBucket, srcObject, dstBucket, dstObject, newMetadata) if err != nil { - // Close the this end of the pipe upon error in PutObject. - pipeReader.CloseWithError(err) - errorIf(err, "Unable to create an object.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } - // Explicitly close the reader, before fetching object info. - pipeReader.Close() md5Sum := objInfo.MD5Sum response := generateCopyObjectResponse(md5Sum, objInfo.ModTime) @@ -354,7 +362,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // Notify object created event. eventNotify(eventData{ Type: ObjectCreatedCopy, - Bucket: bucket, + Bucket: dstBucket, ObjInfo: objInfo, ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr, diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 7ea378bdb..ed454fd87 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -981,19 +981,25 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, bucketName string newObjectName string // name of the newly copied object. copySourceHeader string // data for "X-Amz-Copy-Source" header. Contains the object to be copied in the URL. + metadataGarbage bool + metadataReplace bool + metadataCopy bool + metadata map[string]string accessKey string secretKey string // expected output. expectedRespStatus int }{ - // Test case - 1. + // Test case - 1, copy metadata from newObject1, ignore request headers. { bucketName: bucketName, newObjectName: "newObject1", copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), accessKey: credentials.AccessKey, secretKey: credentials.SecretKey, - + metadata: map[string]string{ + "Content-Type": "application/json", + }, expectedRespStatus: http.StatusOK, }, @@ -1008,6 +1014,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, expectedRespStatus: http.StatusBadRequest, }, + // Test case - 3. // Test case with new object name is same as object to be copied. { @@ -1019,7 +1026,58 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, expectedRespStatus: http.StatusBadRequest, }, + // Test case - 4. + // Test case with new object name is same as object to be copied + // but metadata is updated. + { + bucketName: bucketName, + newObjectName: objectName, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + metadata: map[string]string{ + "Content-Type": "application/json", + }, + metadataReplace: true, + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusOK, + }, + + // Test case - 5. + // Test case with invalid metadata-directive. + { + bucketName: bucketName, + newObjectName: "newObject1", + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + metadata: map[string]string{ + "Content-Type": "application/json", + }, + metadataGarbage: true, + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusBadRequest, + }, + + // Test case - 6. + // Test case with new object name is same as object to be copied + // fail with BadRequest. + { + bucketName: bucketName, + newObjectName: objectName, + copySourceHeader: url.QueryEscape("/" + bucketName + "/" + objectName), + metadata: map[string]string{ + "Content-Type": "application/json", + }, + metadataCopy: true, + accessKey: credentials.AccessKey, + secretKey: credentials.SecretKey, + + expectedRespStatus: http.StatusBadRequest, + }, + + // Test case - 7. // Test case with non-existent source file. // Case for the purpose of failing `api.ObjectAPI.GetObjectInfo`. // Expecting the response status code to http.StatusNotFound (404). @@ -1032,7 +1090,8 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, expectedRespStatus: http.StatusNotFound, }, - // Test case - 5. + + // Test case - 8. // Test case with non-existent source file. // Case for the purpose of failing `api.ObjectAPI.PutObject`. // Expecting the response status code to http.StatusNotFound (404). @@ -1045,7 +1104,8 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, expectedRespStatus: http.StatusNotFound, }, - // Test case - 6. + + // Test case - 9. // Case with invalid AccessKey. { bucketName: bucketName, @@ -1059,7 +1119,8 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, } for i, testCase := range testCases { - var req, reqV2 *http.Request + var req *http.Request + var reqV2 *http.Request // initialize HTTP NewRecorder, this records any mutations to response writer inside the handler. rec := httptest.NewRecorder() // construct HTTP request for copy object. @@ -1073,6 +1134,19 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, if testCase.copySourceHeader != "" { req.Header.Set("X-Amz-Copy-Source", testCase.copySourceHeader) } + // Add custom metadata. + for k, v := range testCase.metadata { + req.Header.Set(k, v) + } + if testCase.metadataReplace { + req.Header.Set("X-Amz-Metadata-Directive", "REPLACE") + } + if testCase.metadataCopy { + req.Header.Set("X-Amz-Metadata-Directive", "COPY") + } + if testCase.metadataGarbage { + req.Header.Set("X-Amz-Metadata-Directive", "Unknown") + } // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. // Call the ServeHTTP to execute the handler, `func (api objectAPIHandlers) CopyObjectHandler` handles the request. apiRouter.ServeHTTP(rec, req) @@ -1106,6 +1180,20 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string, reqV2.Header.Set("X-Amz-Copy-Source", testCase.copySourceHeader) } + // Add custom metadata. + for k, v := range testCase.metadata { + reqV2.Header.Set(k, v+"+x") + } + if testCase.metadataReplace { + reqV2.Header.Set("X-Amz-Metadata-Directive", "REPLACE") + } + if testCase.metadataCopy { + reqV2.Header.Set("X-Amz-Metadata-Directive", "COPY") + } + if testCase.metadataGarbage { + reqV2.Header.Set("X-Amz-Metadata-Directive", "Unknown") + } + err = signRequestV2(reqV2, testCase.accessKey, testCase.secretKey) if err != nil { diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 8edd7d2ba..f50b083ad 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -311,6 +311,14 @@ func deleteAllXLMetadata(disks []StorageAPI, bucket, prefix string, errs []error wg.Wait() } +// Rename `xl.json` content to destination location for each disk in order. +func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) error { + isDir := false + srcXLJSON := path.Join(srcEntry, xlMetaJSONFile) + dstXLJSON := path.Join(dstEntry, xlMetaJSONFile) + return rename(disks, srcBucket, srcXLJSON, dstBucket, dstXLJSON, isDir, quorum) +} + // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) error { var wg = &sync.WaitGroup{} diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 28af752b7..702af34ca 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -42,13 +42,105 @@ var objectOpIgnoredErrs = []error{ /// Object Operations +// CopyObject - copy object source object to destination object. +// if source object and destination object are same we only +// update metadata. +func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string, metadata map[string]string) (ObjectInfo, error) { + // Read metadata associated with the object from all disks. + metaArr, errs := readAllXLMetadata(xl.storageDisks, srcBucket, srcObject) + // Do we have read quorum? + if !isDiskQuorum(errs, xl.readQuorum) { + return ObjectInfo{}, traceError(InsufficientReadQuorum{}, errs...) + } + + if reducedErr := reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum); reducedErr != nil { + return ObjectInfo{}, toObjectErr(reducedErr, srcBucket, srcObject) + } + + // List all online disks. + onlineDisks, modTime := listOnlineDisks(xl.storageDisks, metaArr, errs) + + // Pick latest valid metadata. + xlMeta, err := pickValidXLMeta(metaArr, modTime) + if err != nil { + return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) + } + + // Reorder online disks based on erasure distribution order. + onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks) + + // Length of the file to read. + length := xlMeta.Stat.Size + + // Check if this request is only metadata update. + cpMetadataOnly := strings.EqualFold(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) + if cpMetadataOnly { + xlMeta.Meta = metadata + partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) + // Update `xl.json` content on each disks. + for index := range partsMetadata { + partsMetadata[index] = xlMeta + } + + tempObj := mustGetUUID() + + // Write unique `xl.json` for each disk. + if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { + return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) + } + // Rename atomically `xl.json` from tmp location to destination for each disk. + if err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, xl.writeQuorum); err != nil { + return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) + } + + objInfo := ObjectInfo{ + IsDir: false, + Bucket: srcBucket, + Name: srcObject, + Size: xlMeta.Stat.Size, + ModTime: xlMeta.Stat.ModTime, + MD5Sum: xlMeta.Meta["md5Sum"], + ContentType: xlMeta.Meta["content-type"], + ContentEncoding: xlMeta.Meta["content-encoding"], + } + // md5Sum has already been extracted into objInfo.MD5Sum. We + // need to remove it from xlMetaMap to avoid it from appearing as + // part of response headers. e.g, X-Minio-* or X-Amz-*. + delete(xlMeta.Meta, "md5Sum") + objInfo.UserDefined = xlMeta.Meta + return objInfo, nil + } + + // Initialize pipe. + pipeReader, pipeWriter := io.Pipe() + + go func() { + startOffset := int64(0) // Read the whole file. + if gerr := xl.GetObject(srcBucket, srcObject, startOffset, length, pipeWriter); gerr != nil { + errorIf(gerr, "Unable to read %s of the object `%s/%s`.", srcBucket, srcObject) + pipeWriter.CloseWithError(toObjectErr(gerr, srcBucket, srcObject)) + return + } + pipeWriter.Close() // Close writer explicitly signalling we wrote all data. + }() + + objInfo, err := xl.PutObject(dstBucket, dstObject, length, pipeReader, metadata, "") + if err != nil { + return ObjectInfo{}, toObjectErr(err, dstBucket, dstObject) + } + + // Explicitly close the reader. + pipeReader.Close() + + return objInfo, nil +} + // GetObject - reads an object erasured coded across multiple // disks. Supports additional parameters like offset and length -// which is synonymous with HTTP Range requests. +// which are synonymous with HTTP Range requests. // -// startOffset indicates the location at which the client requested -// object to be read at. length indicates the total length of the -// object requested by client. +// startOffset indicates the starting read location of the object. +// length indicates the total length of the object. func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) error { if err := checkGetObjArgs(bucket, object); err != nil { return err @@ -255,13 +347,13 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er return objInfo, nil } -func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) { +func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, errs []error) { var wg = &sync.WaitGroup{} // Undo rename object on disks where RenameFile succeeded. // If srcEntry/dstEntry are objects then add a trailing slash to copy // over all the parts inside the object directory - if !isPart { + if isDir { srcEntry = retainSlash(srcEntry) dstEntry = retainSlash(dstEntry) } @@ -284,14 +376,14 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. -func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, quorum int) error { +func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, quorum int) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} // Initialize list of errors. var errs = make([]error, len(disks)) - if !isPart { + if isDir { dstEntry = retainSlash(dstEntry) srcEntry = retainSlash(srcEntry) } @@ -319,7 +411,7 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // otherwise return failure. Cleanup successful renames. if !isDiskQuorum(errs, quorum) { // Undo all the partial rename operations. - undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) + undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs) return traceError(errXLWriteQuorum) } return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, quorum) @@ -330,8 +422,8 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, // not have a readQuorum partially renamed files are renamed back to // its proper location. func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) error { - isPart := true - return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, quorum) + isDir := false + return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum) } // renameObject - renames all source objects to destination object @@ -339,8 +431,8 @@ func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart strin // not have a readQuorum partially renamed files are renamed back to // its proper location. func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) error { - isPart := false - return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, quorum) + isDir := true + return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum) } // PutObject - creates an object upon reading from the input stream