Rename parts/objects only on onlineDisks (#2185)

master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent 749a94f6c9
commit 1c82b81408
  1. 8
      xl-v1-multipart.go
  2. 38
      xl-v1-object.go

@ -285,7 +285,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
if err = writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil { if err = writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
rErr := xl.renameObject(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) rErr := renameObject(xl.storageDisks, minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath, xl.writeQuorum, xl.readQuorum)
if rErr == nil { if rErr == nil {
// Return success. // Return success.
return uploadID, nil return uploadID, nil
@ -428,7 +428,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Rename temporary part file to its final location. // Rename temporary part file to its final location.
partPath := path.Join(uploadIDPath, partSuffix) partPath := path.Join(uploadIDPath, partSuffix)
err = xl.renamePart(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) err = renamePart(onlineDisks, minioMetaBucket, tmpPartPath, minioMetaBucket, partPath, xl.writeQuorum, xl.readQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, partPath) return "", toObjectErr(err, minioMetaBucket, partPath)
} }
@ -757,7 +757,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Rename if an object already exists to temporary location. // Rename if an object already exists to temporary location.
uniqueID := getUUID() uniqueID := getUUID()
if xl.isObject(bucket, object) { if xl.isObject(bucket, object) {
err = xl.renameObject(bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID)) err = renameObject(xl.storageDisks, bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID), xl.writeQuorum, xl.readQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
@ -777,7 +777,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Rename the multipart object to final location. // Rename the multipart object to final location.
if err = xl.renameObject(minioMetaBucket, uploadIDPath, bucket, object); err != nil { if err = renameObject(xl.storageDisks, minioMetaBucket, uploadIDPath, bucket, object, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }

@ -230,7 +230,7 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er
return objInfo, nil return objInfo, nil
} }
func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) { func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error, writeQuorum, readQuorum int) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Undo rename object on disks where RenameFile succeeded. // Undo rename object on disks where RenameFile succeeded.
@ -240,7 +240,7 @@ func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string,
srcEntry = retainSlash(srcEntry) srcEntry = retainSlash(srcEntry)
dstEntry = retainSlash(dstEntry) dstEntry = retainSlash(dstEntry)
} }
for index, disk := range xl.storageDisks { for index, disk := range disks {
if disk == nil { if disk == nil {
continue continue
} }
@ -258,25 +258,25 @@ func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string,
} }
// undoRenameObject - renames back the partially successful rename operations. // undoRenameObject - renames back the partially successful rename operations.
func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject string, errs []error) { func undoRenameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, errs []error, writeQuorum, readQuorum int) {
isPart := false isPart := false
xl.undoRename(srcBucket, srcObject, dstBucket, dstObject, isPart, errs) undoRename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, errs, writeQuorum, readQuorum)
} }
// undoRenamePart - renames back the partially successful rename operation. // undoRenamePart - renames back the partially successful rename operation.
func (xl xlObjects) undoRenamePart(srcBucket, srcPart, dstBucket, dstPart string, errs []error) { func undoRenamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, errs []error, writeQuorum, readQuorum int) {
isPart := true isPart := true
xl.undoRename(srcBucket, srcPart, dstBucket, dstPart, isPart, errs) undoRename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, errs, writeQuorum, readQuorum)
} }
// rename - common function that renamePart and renameObject use to rename // rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations. // the respective underlying storage layer representations.
func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool) error { func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, writeQuorum, readQuorum int) error {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Initialize list of errors. // Initialize list of errors.
var errs = make([]error, len(xl.storageDisks)) var errs = make([]error, len(disks))
if !isPart { if !isPart {
dstEntry = retainSlash(dstEntry) dstEntry = retainSlash(dstEntry)
@ -284,7 +284,7 @@ func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPa
} }
// Rename file on all underlying storage disks. // Rename file on all underlying storage disks.
for index, disk := range xl.storageDisks { for index, disk := range disks {
if disk == nil { if disk == nil {
errs[index] = errDiskNotFound errs[index] = errDiskNotFound
continue continue
@ -304,20 +304,20 @@ func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPa
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure. Cleanup successful renames. // otherwise return failure. Cleanup successful renames.
if !isQuorum(errs, xl.writeQuorum) { if !isQuorum(errs, writeQuorum) {
// Check we have successful read quorum. // Check we have successful read quorum.
if isQuorum(errs, xl.readQuorum) { if isQuorum(errs, readQuorum) {
return nil // Return success. return nil // Return success.
} // else - failed to acquire read quorum. } // else - failed to acquire read quorum.
// Undo all the partial rename operations. // Undo all the partial rename operations.
xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum)
return errXLWriteQuorum return errXLWriteQuorum
} }
// Return on first error, also undo any partially successful rename operations. // Return on first error, also undo any partially successful rename operations.
for _, err := range errs { for _, err := range errs {
if err != nil && err != errDiskNotFound { if err != nil && err != errDiskNotFound {
// Undo all the partial rename operations. // Undo all the partial rename operations.
xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum)
return err return err
} }
} }
@ -328,18 +328,18 @@ func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPa
// across all disks in parallel. Additionally if we have errors and do // across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to // not have a readQuorum partially renamed files are renamed back to
// its proper location. // its proper location.
func (xl xlObjects) renamePart(srcBucket, srcObject, dstBucket, dstObject string) error { func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, writeQuorum, readQuorum int) error {
isPart := true isPart := true
return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart) return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, writeQuorum, readQuorum)
} }
// renameObject - renames all source objects to destination object // renameObject - renames all source objects to destination object
// across all disks in parallel. Additionally if we have errors and do // across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to // not have a readQuorum partially renamed files are renamed back to
// its proper location. // its proper location.
func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject string) error { func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, writeQuorum, readQuorum int) error {
isPart := false isPart := false
return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart) return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, writeQuorum, readQuorum)
} }
// PutObject - creates an object upon reading from the input stream // PutObject - creates an object upon reading from the input stream
@ -502,7 +502,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Rename if an object already exists to temporary location. // Rename if an object already exists to temporary location.
newUniqueID := getUUID() newUniqueID := getUUID()
if xl.isObject(bucket, object) { if xl.isObject(bucket, object) {
err = xl.renameObject(bucket, object, minioMetaTmpBucket, newUniqueID) err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum, xl.readQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
@ -528,7 +528,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
} }
// Rename the successfully written temporary object to final location. // Rename the successfully written temporary object to final location.
err = xl.renameObject(minioMetaTmpBucket, tempObj, bucket, object) err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }

Loading…
Cancel
Save