|
|
|
@ -67,12 +67,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i |
|
|
|
|
return toObjectErr(errXLReadQuorum, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If all the disks returned error, we return error.
|
|
|
|
|
if errCount, err := reduceErrs(errs); err != nil { |
|
|
|
|
if errCount < xl.readQuorum { |
|
|
|
|
return toObjectErr(errXLReadQuorum, bucket, object) |
|
|
|
|
} |
|
|
|
|
return toObjectErr(err, bucket, object) |
|
|
|
|
if reducedErr := reduceErrs(errs, []error{ |
|
|
|
|
errDiskNotFound, |
|
|
|
|
errFaultyDisk, |
|
|
|
|
errDiskAccessDenied, |
|
|
|
|
}); reducedErr != nil { |
|
|
|
|
return toObjectErr(reducedErr, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// List all online disks.
|
|
|
|
@ -237,7 +237,7 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er |
|
|
|
|
return objInfo, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error, writeQuorum, readQuorum int) { |
|
|
|
|
func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) { |
|
|
|
|
var wg = &sync.WaitGroup{} |
|
|
|
|
// Undo rename object on disks where RenameFile succeeded.
|
|
|
|
|
|
|
|
|
@ -264,21 +264,9 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str |
|
|
|
|
wg.Wait() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// undoRenameObject - renames back the partially successful rename operations.
|
|
|
|
|
func undoRenameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, errs []error, writeQuorum, readQuorum int) { |
|
|
|
|
isPart := false |
|
|
|
|
undoRename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, errs, writeQuorum, readQuorum) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// undoRenamePart - renames back the partially successful rename operation.
|
|
|
|
|
func undoRenamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, errs []error, writeQuorum, readQuorum int) { |
|
|
|
|
isPart := true |
|
|
|
|
undoRename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, errs, writeQuorum, readQuorum) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// rename - common function that renamePart and renameObject use to rename
|
|
|
|
|
// the respective underlying storage layer representations.
|
|
|
|
|
func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, writeQuorum, readQuorum int) error { |
|
|
|
|
func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, quorum int) error { |
|
|
|
|
// Initialize sync waitgroup.
|
|
|
|
|
var wg = &sync.WaitGroup{} |
|
|
|
|
|
|
|
|
@ -311,52 +299,35 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, |
|
|
|
|
|
|
|
|
|
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
|
|
|
|
|
// otherwise return failure. Cleanup successful renames.
|
|
|
|
|
if !isDiskQuorum(errs, writeQuorum) { |
|
|
|
|
// Check we have successful read quorum.
|
|
|
|
|
if isDiskQuorum(errs, readQuorum) { |
|
|
|
|
return nil // Return success.
|
|
|
|
|
} // else - failed to acquire read quorum.
|
|
|
|
|
if !isDiskQuorum(errs, quorum) { |
|
|
|
|
// Undo all the partial rename operations.
|
|
|
|
|
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) |
|
|
|
|
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs) |
|
|
|
|
return errXLWriteQuorum |
|
|
|
|
} |
|
|
|
|
// Return on first error, also undo any partially successful rename operations.
|
|
|
|
|
if errCount, reducedErr := reduceErrs(errs); reducedErr != nil { |
|
|
|
|
if errCount < writeQuorum { |
|
|
|
|
// Undo all the partial rename operations.
|
|
|
|
|
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) |
|
|
|
|
return errXLWriteQuorum |
|
|
|
|
} |
|
|
|
|
if isErrIgnored(reducedErr, []error{ |
|
|
|
|
return reduceErrs(errs, []error{ |
|
|
|
|
errDiskNotFound, |
|
|
|
|
errDiskAccessDenied, |
|
|
|
|
errFaultyDisk, |
|
|
|
|
errVolumeNotFound, |
|
|
|
|
}) { |
|
|
|
|
// Return success.
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
return reducedErr |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// renamePart - renames a part of the source object to the destination
|
|
|
|
|
// across all disks in parallel. Additionally if we have errors and do
|
|
|
|
|
// not have a readQuorum partially renamed files are renamed back to
|
|
|
|
|
// its proper location.
|
|
|
|
|
func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, writeQuorum, readQuorum int) error { |
|
|
|
|
func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) error { |
|
|
|
|
isPart := true |
|
|
|
|
return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, writeQuorum, readQuorum) |
|
|
|
|
return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isPart, quorum) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// renameObject - renames all source objects to destination object
|
|
|
|
|
// across all disks in parallel. Additionally if we have errors and do
|
|
|
|
|
// not have a readQuorum partially renamed files are renamed back to
|
|
|
|
|
// its proper location.
|
|
|
|
|
func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, writeQuorum, readQuorum int) error { |
|
|
|
|
func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) error { |
|
|
|
|
isPart := false |
|
|
|
|
return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, writeQuorum, readQuorum) |
|
|
|
|
return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isPart, quorum) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// PutObject - creates an object upon reading from the input stream
|
|
|
|
@ -508,7 +479,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. |
|
|
|
|
// Rename if an object already exists to temporary location.
|
|
|
|
|
newUniqueID := getUUID() |
|
|
|
|
if xl.isObject(bucket, object) { |
|
|
|
|
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum, xl.readQuorum) |
|
|
|
|
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
@ -530,12 +501,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Write unique `xl.json` for each disk.
|
|
|
|
|
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { |
|
|
|
|
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { |
|
|
|
|
return "", toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Rename the successfully written temporary object to final location.
|
|
|
|
|
err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum, xl.readQuorum) |
|
|
|
|
err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", toObjectErr(err, bucket, object) |
|
|
|
|
} |
|
|
|
|