diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 5a4369d65..6f71fa1b7 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -220,7 +220,7 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false) // Attempt to rename temp upload object to actual upload path object - _, rErr := renameObject(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum) + _, rErr := rename(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil) if rErr != nil { return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) } @@ -409,7 +409,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID // Rename temporary part file to its final location. partPath := path.Join(uploadIDPath, partSuffix) - onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) + onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } @@ -750,10 +750,10 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Delete success renamed object. defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID, writeQuorum, false) - // NOTE: Do not use online disks slice here. - // The reason is that existing object should be purged - // regardless of `xl.json` status and rolled back in case of errors. - _, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum) + // NOTE: Do not use online disks slice here: the reason is that existing object should be purged + // regardless of `xl.json` status and rolled back in case of errors. Also allow renaming of the + // existing object if it is not present in quorum disks so users can overwrite stale objects. + _, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound}) if err != nil { return oi, toObjectErr(err, bucket, object) } @@ -773,7 +773,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } // Rename the multipart object to final location. - if _, err = renameObject(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, writeQuorum); err != nil { + if _, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, true, writeQuorum, nil); err != nil { return oi, toObjectErr(err, bucket, object) } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 048ea33b6..ecf544024 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -548,33 +548,6 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc return evalDisks(disks, errs), err } -// renamePart - renames a part of the source object to the destination -// across all disks in parallel. Additionally if we have errors and do -// not have a readQuorum partially renamed files are renamed back to -// its proper location. -func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) ([]StorageAPI, error) { - isDir := false - return rename(ctx, disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum, []error{errFileNotFound}) -} - -// renameObjectDir - renames all source objects directories to destination -// object directories across all disks in parallel. Additionally if we have -// errors and do not have a readQuorum partially renamed files are renamed -// back to its proper location. -func renameObjectDir(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) { - isDir := true - return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound, errFileAccessDenied}) -} - -// renameObject - renames all source objects to destination object -// across all disks in parallel. Additionally if we have errors and do -// not have a readQuorum partially renamed files are renamed back to -// its proper location. -func renameObject(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) { - isDir := true - return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound}) -} - // PutObject - creates an object upon reading from the input stream // until EOF, erasure codes the data across all disk and additionally // writes `xl.json` which carries the necessary metadata for future @@ -632,7 +605,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Rename the successfully written temporary object to final location. - if _, err = renameObjectDir(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { + if _, err = rename(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -799,10 +772,10 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // Delete successfully renamed object. defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID, writeQuorum, false) - // NOTE: Do not use online disks slice here. - // The reason is that existing object should be purged - // regardless of `xl.json` status and rolled back in case of errors. - _, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum) + // NOTE: Do not use online disks slice here: the reason is that existing object should be purged + // regardless of `xl.json` status and rolled back in case of errors. Also allow renaming the + // existing object if it is not present in quorum disks so users can overwrite stale objects. + _, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound}) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -822,7 +795,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Rename the successfully written temporary object to final location. - if _, err = renameObject(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil { + if _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -858,10 +831,15 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri tmpObj = object disks = xl.getDisks() } else { + // Rename the current object while requiring write quorum, but also consider + // that a non found object in a given disk as a success since it already + // confirms that the object doesn't have a part in that disk (already removed) if isDir { - disks, err = renameObjectDir(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, writeQuorum) + disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, true, writeQuorum, + []error{errFileNotFound, errFileAccessDenied}) } else { - disks, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, writeQuorum) + disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, true, writeQuorum, + []error{errFileNotFound}) } if err != nil { return toObjectErr(err, bucket, object)