Renaming a part from tmp namespace needs to be handled different from… (#1944)

* Renaming a part from tmp namespace needs to be handled different from renaming of an object

* Made argument passing in xl.rename and xl.undoRename explicit
master
Krishnan Parthasarathi 9 years ago committed by Harshavardhana
parent 6143c87c3a
commit 393c504de0
  1. 2
      xl-v1-multipart.go
  2. 64
      xl-v1-object.go

@ -372,7 +372,7 @@ func (xl xlObjects) putObjectPart(bucket string, object string, uploadID string,
// Rename temporary part file to its final location. // Rename temporary part file to its final location.
partPath := path.Join(uploadIDPath, partSuffix) partPath := path.Join(uploadIDPath, partSuffix)
err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) err = xl.renamePart(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, partPath) return "", toObjectErr(err, minioMetaBucket, partPath)
} }

@ -154,10 +154,16 @@ func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, er
return objInfo, nil return objInfo, nil
} }
// undoRenameObject - renames back the partially successful rename operations. func (xl xlObjects) undoRename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool, errs []error) {
func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject string, errs []error) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Undo rename object on disks where RenameFile succeeded. // Undo rename object on disks where RenameFile succeeded.
// If srcEntry/dstEntry are objects then add a trailing slash to copy
// over all the parts inside the object directory
if !isPart {
srcEntry = retainSlash(srcEntry)
dstEntry = retainSlash(dstEntry)
}
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
continue continue
@ -169,36 +175,48 @@ func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject
if errs[index] != nil { if errs[index] != nil {
return return
} }
_ = disk.RenameFile(dstBucket, retainSlash(dstObject), srcBucket, retainSlash(srcObject)) _ = disk.RenameFile(dstBucket, dstEntry, srcBucket, srcEntry)
}(index, disk) }(index, disk)
} }
wg.Wait() wg.Wait()
} }
// renameObject - renames all source objects to destination object // undoRenameObject - renames back the partially successful rename operations.
// across all disks in parallel. Additionally if we have errors and do func (xl xlObjects) undoRenameObject(srcBucket, srcObject, dstBucket, dstObject string, errs []error) {
// not have a readQuorum partially renamed files are renamed back to isPart := false
// its proper location. xl.undoRename(srcBucket, srcObject, dstBucket, dstObject, isPart, errs)
func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject string) error { }
// undoRenamePart - renames back the partially successful rename operation.
func (xl xlObjects) undoRenamePart(srcBucket, srcPart, dstBucket, dstPart string, errs []error) {
isPart := true
xl.undoRename(srcBucket, srcPart, dstBucket, dstPart, isPart, errs)
}
// rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations.
func (xl xlObjects) rename(srcBucket, srcEntry, dstBucket, dstEntry string, isPart bool) error {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
// Initialize list of errors. // Initialize list of errors.
var errs = make([]error, len(xl.storageDisks)) var errs = make([]error, len(xl.storageDisks))
if !isPart {
dstEntry = retainSlash(dstEntry)
srcEntry = retainSlash(srcEntry)
}
// Rename file on all underlying storage disks. // Rename file on all underlying storage disks.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
errs[index] = errDiskNotFound errs[index] = errDiskNotFound
continue continue
} }
// Append "/" as srcObject and dstObject are either leaf-dirs or non-leaf-dris.
// If srcObject is an object instead of prefix we just rename the leaf-dir and
// not rename the part and metadata files separately.
wg.Add(1) wg.Add(1)
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
err := disk.RenameFile(srcBucket, retainSlash(srcObject), dstBucket, retainSlash(dstObject)) err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry)
if err != nil && err != errFileNotFound { if err != nil && err != errFileNotFound {
errs[index] = err errs[index] = err
} }
@ -216,20 +234,38 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri
return nil // Return success. return nil // Return success.
} // else - failed to acquire read quorum. } // else - failed to acquire read quorum.
// Undo all the partial rename operations. // Undo all the partial rename operations.
xl.undoRenameObject(srcBucket, srcObject, dstBucket, dstObject, errs) xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs)
return errXLWriteQuorum return errXLWriteQuorum
} }
// Return on first error, also undo any partially successful rename operations. // Return on first error, also undo any partially successful rename operations.
for _, err := range errs { for _, err := range errs {
if err != nil && err != errDiskNotFound { if err != nil && err != errDiskNotFound {
// Undo all the partial rename operations. // Undo all the partial rename operations.
xl.undoRenameObject(srcBucket, srcObject, dstBucket, dstObject, errs) xl.undoRename(srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs)
return err return err
} }
} }
return nil return nil
} }
// renamePart - renames a part of the source object to the destination
// across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to
// its proper location.
func (xl xlObjects) renamePart(srcBucket, srcObject, dstBucket, dstObject string) error {
isPart := true
return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart)
}
// renameObject - renames all source objects to destination object
// across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to
// its proper location.
func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject string) error {
isPart := false
return xl.rename(srcBucket, srcObject, dstBucket, dstObject, isPart)
}
// PutObject - creates an object upon reading from the input stream // PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally // until EOF, erasure codes the data across all disk and additionally
// writes `xl.json` which carries the necessary metadata for future // writes `xl.json` which carries the necessary metadata for future

Loading…
Cancel
Save