xl: Require full write quorum in rename() as general rule (#6535)

Simplify the logic of using rename() in xl. Currently, renaming
doesn't require the source object/dir to be existent in at least
read quorum disks, since there is no apparent reason for it
to be as a general rule, this commit will just simplify the
logic to avoid possible inconsistency in the backend in the future.
master
Anis Elleuch 6 years ago committed by kannappanr
parent e031f2b614
commit f187a16962
  1. 14
      cmd/xl-v1-multipart.go
  2. 48
      cmd/xl-v1-object.go

@ -220,7 +220,7 @@ func (xl xlObjects) newMultipartUpload(ctx context.Context, bucket string, objec
defer xl.deleteObject(ctx, minioMetaTmpBucket, tempUploadIDPath, writeQuorum, false)
// Attempt to rename temp upload object to actual upload path object
_, rErr := renameObject(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, writeQuorum)
_, rErr := rename(ctx, disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, true, writeQuorum, nil)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
}
@ -409,7 +409,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
// Rename temporary part file to its final location.
partPath := path.Join(uploadIDPath, partSuffix)
onlineDisks, err = renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, false, writeQuorum, nil)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
@ -750,10 +750,10 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Delete success renamed object.
defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID, writeQuorum, false)
// NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors.
_, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum)
// NOTE: Do not use online disks slice here: the reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. Also allow renaming of the
// existing object if it is not present in quorum disks so users can overwrite stale objects.
_, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound})
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
@ -773,7 +773,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
// Rename the multipart object to final location.
if _, err = renameObject(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, writeQuorum); err != nil {
if _, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, true, writeQuorum, nil); err != nil {
return oi, toObjectErr(err, bucket, object)
}

@ -548,33 +548,6 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
return evalDisks(disks, errs), err
}
// renamePart - renames a part of the source object to the destination
// across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to
// its proper location.
func renamePart(ctx context.Context, disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) ([]StorageAPI, error) {
isDir := false
return rename(ctx, disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum, []error{errFileNotFound})
}
// renameObjectDir - renames all source objects directories to destination
// object directories across all disks in parallel. Additionally if we have
// errors and do not have a readQuorum partially renamed files are renamed
// back to its proper location.
func renameObjectDir(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) {
isDir := true
return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound, errFileAccessDenied})
}
// renameObject - renames all source objects to destination object
// across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to
// its proper location.
func renameObject(ctx context.Context, disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) {
isDir := true
return rename(ctx, disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum, []error{errFileNotFound})
}
// PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally
// writes `xl.json` which carries the necessary metadata for future
@ -632,7 +605,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
// Rename the successfully written temporary object to final location.
if _, err = renameObjectDir(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
if _, err = rename(ctx, xl.getDisks(), minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -799,10 +772,10 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
// Delete successfully renamed object.
defer xl.deleteObject(ctx, minioMetaTmpBucket, newUniqueID, writeQuorum, false)
// NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors.
_, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, writeQuorum)
// NOTE: Do not use online disks slice here: the reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. Also allow renaming the
// existing object if it is not present in quorum disks so users can overwrite stale objects.
_, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound})
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -822,7 +795,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
// Rename the successfully written temporary object to final location.
if _, err = renameObject(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
if _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -858,10 +831,15 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri
tmpObj = object
disks = xl.getDisks()
} else {
// Rename the current object while requiring write quorum, but also consider
// that a non found object in a given disk as a success since it already
// confirms that the object doesn't have a part in that disk (already removed)
if isDir {
disks, err = renameObjectDir(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, writeQuorum)
disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, true, writeQuorum,
[]error{errFileNotFound, errFileAccessDenied})
} else {
disks, err = renameObject(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, writeQuorum)
disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObj, true, writeQuorum,
[]error{errFileNotFound})
}
if err != nil {
return toObjectErr(err, bucket, object)

Loading…
Cancel
Save