From 5e8de786b3d371826444c99643f2503a2428811f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 29 May 2016 00:42:09 -0700 Subject: [PATCH] XL: Truly use unique id's in temp directory. (#1790) This also helps in avoiding cleaning up directories after. Additionally this patch also fixes the problem of Range offsets. --- fs-v1-multipart.go | 8 ++--- fs-v1.go | 4 ++- xl-v1-metadata.go | 12 ++++++-- xl-v1-multipart-common.go | 6 ++-- xl-v1-multipart.go | 22 +++++++------- xl-v1-object.go | 62 +++++++++++++++++++-------------------- 6 files changed, 64 insertions(+), 50 deletions(-) diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index 778c33252..768925e8f 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -76,7 +76,7 @@ func (fs fsObjects) newMultipartUploadCommon(bucket string, object string, meta return "", err } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } @@ -300,7 +300,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) partSuffix := fmt.Sprintf("object%d", partID) - tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) + tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix) // Initialize md5 writer. md5Writer := md5.New() @@ -348,7 +348,7 @@ func (fs fsObjects) putObjectPartCommon(bucket string, object string, uploadID s return "", toObjectErr(err, minioMetaBucket, partPath) } uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) if err = fs.writeFSMetadata(minioMetaBucket, tempUploadIDPath, fsMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } @@ -475,7 +475,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", err } - tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID, "object1") + tempObj := path.Join(tmpMetaPrefix, uploadID, "object1") var buffer = make([]byte, blockSize) // Loop through all parts, validate them and then commit to disk. diff --git a/fs-v1.go b/fs-v1.go index 62df7b062..fdc1ba091 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -229,8 +229,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. } } + uniqueID := getUUID() + // Temporary object. - tempObj := path.Join(tmpMetaPrefix, bucket, object) + tempObj := path.Join(tmpMetaPrefix, uniqueID) // Initialize md5 writer. md5Writer := md5.New() diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index b89bb79d8..790530bed 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -140,8 +140,8 @@ func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err // Count for errors encountered. var xlJSONErrCount = 0 - // Allocate 4MB buffer. - var buffer = make([]byte, blockSize) + // Allocate 4MiB buffer. + buffer := make([]byte, blockSize) // Return the first successful lookup from a random list of disks. for xlJSONErrCount < len(xl.storageDisks) { @@ -173,6 +173,7 @@ func newXLMetaV1(dataBlocks, parityBlocks int) (xlMeta xlMetaV1) { return xlMeta } +// renameXLMetadata - renames `xl.json` from source prefix to destination prefix. func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix string) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(xl.storageDisks)) @@ -185,11 +186,18 @@ func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix // Rename `xl.json` in a routine. go func(index int, disk StorageAPI) { defer wg.Done() + // Renames `xl.json` from source prefix to destination prefix. rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) if rErr != nil { mErrs[index] = rErr return } + // Delete any dangling directories. + dErr := disk.DeleteFile(srcBucket, srcPrefix) + if dErr != nil { + mErrs[index] = dErr + return + } mErrs[index] = nil }(index, disk) } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 6b725f03e..67a5d6d4a 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -114,7 +114,8 @@ func readUploadsJSON(bucket, object string, storageDisks ...StorageAPI) (uploadI // uploadUploadsJSON - update `uploads.json` with new uploadsJSON for all disks. func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisks ...StorageAPI) error { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) + uniqueID := getUUID() + tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} @@ -169,7 +170,8 @@ func newUploadsV1(format string) uploadsV1 { // writeUploadJSON - create `uploads.json` or update it with new uploadID. func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, storageDisks ...StorageAPI) (err error) { uploadsPath := path.Join(mpartMetaPrefix, bucket, object, uploadsJSONFile) - tmpUploadsPath := path.Join(tmpMetaPrefix, bucket, object, uploadsJSONFile) + uniqueID := getUUID() + tmpUploadsPath := path.Join(tmpMetaPrefix, uniqueID) var errs = make([]error, len(storageDisks)) var wg = &sync.WaitGroup{} diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index c5d998775..c4c197d95 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -82,7 +82,7 @@ func (xl xlObjects) newMultipartUploadCommon(bucket string, object string, meta return "", err } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } @@ -141,7 +141,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) partSuffix := fmt.Sprintf("object%d", partID) - tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) + tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix) // Initialize md5 writer. md5Writer := md5.New() @@ -173,6 +173,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s return "", BadDigest{md5Hex, newMD5Hex} } } + partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) if err != nil { @@ -184,7 +185,7 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } @@ -372,7 +373,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Save successfully calculated md5sum. xlMeta.Meta["md5Sum"] = s3MD5 uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } @@ -380,16 +381,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } - // Hold write lock on the destination before rename nsMutex.Lock(bucket, object) defer nsMutex.Unlock(bucket, object) - // Delete if an object already exists. - // FIXME: rename it to tmp file and delete only after - // the newly uploaded file is renamed from tmp location to - // the original location. Verify if the object is a multipart object. - err = xl.deleteObject(bucket, object) + // Rename if an object already exists to temporary location. + uniqueID := getUUID() + err = xl.renameObject(bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID)) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -407,10 +405,14 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } } + // Rename the multipart object to final location. if err = xl.renameObject(minioMetaBucket, uploadIDPath, bucket, object); err != nil { return "", toObjectErr(err, bucket, object) } + // Delete the previously successfully renamed object. + xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID)) + // Hold the lock so that two parallel complete-multipart-uploads do no // leave a stale uploads.json behind. nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) diff --git a/xl-v1-object.go b/xl-v1-object.go index aa9ae4103..604ace929 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -54,27 +54,22 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i totalLeft := length for ; partIndex < len(xlMeta.Parts); partIndex++ { part := xlMeta.Parts[partIndex] - totalPartSize := part.Size - for totalPartSize > 0 { - var buffer io.Reader - buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) - if err != nil { - return err - } - if int64(buffer.(*bytes.Buffer).Len()) > totalLeft { - if _, err := io.CopyN(writer, buffer, totalLeft); err != nil { - return err - } - return nil - } - n, err := io.Copy(writer, buffer) - if err != nil { + var buffer io.Reader + buffer, err = erasure.ReadFile(bucket, pathJoin(object, part.Name), partOffset, part.Size) + if err != nil { + return err + } + if int64(buffer.(*bytes.Buffer).Len()) > totalLeft { + if _, err := io.CopyN(writer, buffer, totalLeft); err != nil { return err } - totalLeft -= n - totalPartSize -= n - partOffset += n + return nil } + n, err := io.Copy(writer, buffer) + if err != nil { + return err + } + totalLeft -= n // Reset part offset to 0 to read rest of the parts from the beginning. partOffset = 0 } @@ -203,8 +198,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. nsMutex.Lock(bucket, object) defer nsMutex.Unlock(bucket, object) - tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1") - tempObj := path.Join(tmpMetaPrefix, bucket, object) + uniqueID := getUUID() + tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "object1") + tempObj := path.Join(tmpMetaPrefix, uniqueID) // Initialize xl meta. xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) @@ -281,13 +277,9 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. return "", toObjectErr(errFileAccessDenied, bucket, object) } - // Delete if an object already exists. - err = xl.deleteObject(bucket, object) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - err = xl.renameObject(minioMetaBucket, tempObj, bucket, object) + // Rename if an object already exists to temporary location. + newUniqueID := getUUID() + err = xl.renameObject(bucket, object, minioMetaBucket, path.Join(tmpMetaPrefix, newUniqueID)) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -298,13 +290,21 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. xlMeta.Stat.ModTime = modTime xlMeta.Stat.Version = higherVersion xlMeta.AddObjectPart(1, "object1", newMD5Hex, xlMeta.Stat.Size) - if err = xl.writeXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), xlMeta); err != nil { + + // Write `xl.json` metadata. + if err = xl.writeXLMetadata(minioMetaBucket, tempObj, xlMeta); err != nil { return "", toObjectErr(err, bucket, object) } - rErr := xl.renameXLMetadata(minioMetaBucket, path.Join(tmpMetaPrefix, bucket, object), bucket, object) - if rErr != nil { - return "", toObjectErr(rErr, bucket, object) + + // Rename the successfully written tempoary object to final location. + err = xl.renameObject(minioMetaBucket, tempObj, bucket, object) + if err != nil { + return "", toObjectErr(err, bucket, object) } + + // Delete the temporary object. + xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, newUniqueID)) + // Return md5sum, successfully wrote object. return newMD5Hex, nil }