From 1c82b81408bf613bb694bdd0b9c6f2c02e9f6fed Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Mon, 11 Jul 2016 22:53:54 -0700 Subject: [PATCH] Rename parts/objects only on onlineDisks (#2185) --- xl-v1-multipart.go | 8 ++++---- xl-v1-object.go | 38 +++++++++++++++++++------------------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 04028c303..6392c0a6c 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -285,7 +285,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st if err = writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } - rErr := xl.renameObject(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) + rErr := renameObject(xl.storageDisks, minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath, xl.writeQuorum, xl.readQuorum) if rErr == nil { // Return success. return uploadID, nil @@ -428,7 +428,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Rename temporary part file to its final location. partPath := path.Join(uploadIDPath, partSuffix) - err = xl.renamePart(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) + err = renamePart(onlineDisks, minioMetaBucket, tmpPartPath, minioMetaBucket, partPath, xl.writeQuorum, xl.readQuorum) if err != nil { return "", toObjectErr(err, minioMetaBucket, partPath) } @@ -757,7 +757,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Rename if an object already exists to temporary location. uniqueID := getUUID() if xl.isObject(bucket, object) { - err = xl.renameObject(bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID)) + err = renameObject(xl.storageDisks, bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID), xl.writeQuorum, xl.readQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -777,7 +777,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Rename the multipart object to final location. - if err = xl.renameObject(minioMetaBucket, uploadIDPath, bucket, object); err != nil { + if err = renameObject(xl.storageDisks, minioMetaBucket, uploadIDPath, bucket, object, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, bucket, object) } diff --git a/xl-v1-object.go b/xl-v1-object.go index e5cd1fc19..cfc23cd3d 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -230,7 +230,7 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er return objInfo, nil } -func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) { +func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error, writeQuorum, readQuorum int) { var wg = &sync.WaitGroup{} // Undo rename object on disks where RenameFile succeeded. @@ -240,7 +240,7 @@ func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string, srcEntry = retainSlash(srcEntry) dstEntry = retainSlash(dstEntry) } - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { continue } @@ -258,25 +258,25 @@ func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string, } // undoRenameObject - renames back the partially successful rename operations. -func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject string, errs []error) { +func undoRenameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, errs []error, writeQuorum, readQuorum int) { isPart := false - xl.undoRename(srcBucket, srcObject, dstBucket, dstObject, isPart, errs) + undoRename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, errs, writeQuorum, readQuorum) } // undoRenamePart - renames back the partially successful rename operation. -func (xl xlObjects) undoRenamePart(srcBucket, srcPart, dstBucket, dstPart string, errs []error) { +func undoRenamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, errs []error, writeQuorum, readQuorum int) { isPart := true - xl.undoRename(srcBucket, srcPart, dstBucket, dstPart, isPart, errs) + undoRename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, errs, writeQuorum, readQuorum) } // rename - common function that renamePart and renameObject use to rename // the respective underlying storage layer representations. -func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool) error { +func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, writeQuorum, readQuorum int) error { // Initialize sync waitgroup. var wg = &sync.WaitGroup{} // Initialize list of errors. - var errs = make([]error, len(xl.storageDisks)) + var errs = make([]error, len(disks)) if !isPart { dstEntry = retainSlash(dstEntry) @@ -284,7 +284,7 @@ func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPa } // Rename file on all underlying storage disks. - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { errs[index] = errDiskNotFound continue @@ -304,20 +304,20 @@ func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPa // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // otherwise return failure. Cleanup successful renames. - if !isQuorum(errs, xl.writeQuorum) { + if !isQuorum(errs, writeQuorum) { // Check we have successful read quorum. - if isQuorum(errs, xl.readQuorum) { + if isQuorum(errs, readQuorum) { return nil // Return success. } // else - failed to acquire read quorum. // Undo all the partial rename operations. - xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) + undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) return errXLWriteQuorum } // Return on first error, also undo any partially successful rename operations. for _, err := range errs { if err != nil && err != errDiskNotFound { // Undo all the partial rename operations. - xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) + undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) return err } } @@ -328,18 +328,18 @@ func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPa // across all disks in parallel. Additionally if we have errors and do // not have a readQuorum partially renamed files are renamed back to // its proper location. -func (xl xlObjects) renamePart(srcBucket, srcObject, dstBucket, dstObject string) error { +func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, writeQuorum, readQuorum int) error { isPart := true - return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart) + return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, writeQuorum, readQuorum) } // renameObject - renames all source objects to destination object // across all disks in parallel. Additionally if we have errors and do // not have a readQuorum partially renamed files are renamed back to // its proper location. -func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject string) error { +func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, writeQuorum, readQuorum int) error { isPart := false - return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart) + return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, writeQuorum, readQuorum) } // PutObject - creates an object upon reading from the input stream @@ -502,7 +502,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Rename if an object already exists to temporary location. newUniqueID := getUUID() if xl.isObject(bucket, object) { - err = xl.renameObject(bucket, object, minioMetaTmpBucket, newUniqueID) + err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum, xl.readQuorum) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -528,7 +528,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Rename the successfully written temporary object to final location. - err = xl.renameObject(minioMetaTmpBucket, tempObj, bucket, object) + err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum) if err != nil { return "", toObjectErr(err, bucket, object) }