From 3aa0574c6b49e50dcb6392798b99359f18a57ff4 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Thu, 1 Sep 2016 02:12:57 +0530 Subject: [PATCH] FS/multipart: Append the parts to a file as and when the parts arrive. (#2513) --- cmd/fs-v1-metadata.go | 36 +++-- cmd/fs-v1-multipart.go | 302 +++++++++++++++++++++++++++++------------ cmd/fs-v1.go | 11 +- 3 files changed, 241 insertions(+), 108 deletions(-) diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index 2e133fc2b..365b800c7 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -77,9 +77,9 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin } // readFSMetadata - returns the object metadata `fs.json` content. -func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, err error) { +func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1, err error) { // Read all `fs.json`. - buf, err := disk.ReadAll(bucket, path.Join(object, fsMetaJSONFile)) + buf, err := disk.ReadAll(bucket, filePath) if err != nil { return fsMetaV1{}, err } @@ -93,6 +93,19 @@ func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, er return fsMeta, nil } +// Write fsMeta to fs.json or fs-append.json. +func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) (err error) { + tmpPath := path.Join(tmpMetaPrefix, getUUID()) + metadataBytes, err := json.Marshal(fsMeta) + if err != nil { + return err + } + if err = disk.AppendFile(minioMetaBucket, tmpPath, metadataBytes); err != nil { + return err + } + return disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath) +} + // newFSMetaV1 - initializes new fsMetaV1. func newFSMetaV1() (fsMeta fsMetaV1) { fsMeta = fsMetaV1{} @@ -131,17 +144,18 @@ func writeFSFormatData(storage StorageAPI, fsFormat formatConfigV1) error { return nil } -// writeFSMetadata - writes `fs.json` metadata, marshals fsMeta object into json -// and saves it to disk. -func writeFSMetadata(storage StorageAPI, bucket, path string, fsMeta fsMetaV1) error { - metadataBytes, err := json.Marshal(fsMeta) - if err != nil { - return err +// Return if the part info in uploadedParts and completeParts are same. +func isPartsSame(uploadedParts []objectPartInfo, completeParts []completePart) bool { + if len(uploadedParts) != len(completeParts) { + return false } - if err = storage.AppendFile(bucket, path, metadataBytes); err != nil { - return err + for i := range completeParts { + if uploadedParts[i].Number != completeParts[i].PartNumber || + uploadedParts[i].ETag != completeParts[i].ETag { + return false + } } - return nil + return true } var extendedHeaders = []string{ diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 84714774c..a65a02704 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -235,17 +235,9 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil { return "", err } - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempFSMetadataPath := path.Join(tmpMetaPrefix, getUUID()+"-"+fsMetaJSONFile) - if err = writeFSMetadata(fs.storage, minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, tempFSMetadataPath) - } - err = fs.storage.RenameFile(minioMetaBucket, tempFSMetadataPath, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) - if err != nil { - if dErr := fs.storage.DeleteFile(minioMetaBucket, tempFSMetadataPath); dErr != nil { - return "", toObjectErr(dErr, minioMetaBucket, tempFSMetadataPath) - } - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) + if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, fsMetaPath) } // Return success. return uploadID, nil @@ -272,6 +264,127 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st return fs.newMultipartUpload(bucket, object, meta) } +// Returns if a new part can be appended to fsAppendDataFile. +func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, appendNeeded bool) { + if len(fsMeta.Parts) == 0 { + return + } + // As fsAppendMeta.Parts will be sorted len(fsAppendMeta.Parts) will naturally be the next part number + nextPartNum := len(fsAppendMeta.Parts) + 1 + nextPartIndex := fsMeta.ObjectPartIndex(nextPartNum) + if nextPartIndex == -1 { + return + } + return fsMeta.Parts[nextPartIndex], true +} + +// Returns metadata path for the file holding info about the parts that +// have been appended to the "append-file" +func getFSAppendMetaPath(uploadID string) string { + return path.Join(tmpMetaPrefix, uploadID+".json") +} + +// Returns path for the append-file. +func getFSAppendDataPath(uploadID string) string { + return path.Join(tmpMetaPrefix, uploadID+".data") +} + +// Append parts to fsAppendDataFile. +func appendParts(disk StorageAPI, bucket, object, uploadID string) { + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + // fs-append.json path + fsAppendMetaPath := getFSAppendMetaPath(uploadID) + // fs.json path + fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) + + // Lock the uploadID so that no one modifies fs.json + nsMutex.RLock(minioMetaBucket, uploadIDPath) + fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath) + nsMutex.RUnlock(minioMetaBucket, uploadIDPath) + if err != nil { + return + } + + // Lock fs-append.json so that there is no parallel append to the file. + nsMutex.Lock(minioMetaBucket, fsAppendMetaPath) + defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath) + + fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath) + if err != nil { + if err != errFileNotFound { + return + } + fsAppendMeta = fsMeta + fsAppendMeta.Parts = nil + } + + // Check if a part needs to be appended to + part, appendNeeded := partToAppend(fsMeta, fsAppendMeta) + if !appendNeeded { + return + } + // Hold write lock on the part so that there is no parallel upload on the part. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number))) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number))) + + // Proceed to append "part" + fsAppendDataPath := getFSAppendDataPath(uploadID) + tmpDataPath := path.Join(tmpMetaPrefix, getUUID()) + if part.Number != 1 { + // Move it to tmp location before appending so that we don't leave inconsitent data + // if server crashes during append operation. + err = disk.RenameFile(minioMetaBucket, fsAppendDataPath, minioMetaBucket, tmpDataPath) + if err != nil { + return + } + // Delete fs-append.json so that we don't leave a stale file if server crashes + // when the part is being appended to the tmp file. + err = disk.DeleteFile(minioMetaBucket, fsAppendMetaPath) + if err != nil { + return + } + } + // Path to the part that needs to be appended. + partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name) + offset := int64(0) + totalLeft := part.Size + buf := make([]byte, readSizeV1) + for totalLeft > 0 { + curLeft := int64(readSizeV1) + if totalLeft < readSizeV1 { + curLeft = totalLeft + } + var n int64 + n, err = disk.ReadFile(minioMetaBucket, partPath, offset, buf[:curLeft]) + if n > 0 { + if err = disk.AppendFile(minioMetaBucket, tmpDataPath, buf[:n]); err != nil { + return + } + } + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + return + } + offset += n + totalLeft -= n + } + // All good, the part has been appended to the tmp file, rename it back. + if err = disk.RenameFile(minioMetaBucket, tmpDataPath, minioMetaBucket, fsAppendDataPath); err != nil { + return + } + fsAppendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size) + if err = writeFSMetadata(disk, minioMetaBucket, fsAppendMetaPath, fsAppendMeta); err != nil { + return + } + // If there are more parts that need to be appended to fsAppendDataFile + _, appendNeeded = partToAppend(fsMeta, fsAppendMeta) + if appendNeeded { + go appendParts(disk, bucket, object, uploadID) + } +} + // PutObjectPart - reads incoming data until EOF for the part file on // an ongoing multipart transaction. Internally incoming data is // written to '.minio/tmp' location and safely renamed to @@ -366,9 +479,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s return "", InvalidUploadID{UploadID: uploadID} } - fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath) + fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath) if err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + return "", toObjectErr(err, minioMetaBucket, fsMetaPath) } fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) @@ -380,18 +494,11 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } return "", toObjectErr(err, minioMetaBucket, partPath) } - uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) - tempFSMetadataPath := path.Join(tmpMetaPrefix, getUUID()+"-"+fsMetaJSONFile) - if err = writeFSMetadata(fs.storage, minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil { - return "", toObjectErr(err, minioMetaBucket, tempFSMetadataPath) - } - err = fs.storage.RenameFile(minioMetaBucket, tempFSMetadataPath, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile)) - if err != nil { - if dErr := fs.storage.DeleteFile(minioMetaBucket, tempFSMetadataPath); dErr != nil { - return "", toObjectErr(dErr, minioMetaBucket, tempFSMetadataPath) - } - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + + if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { + return "", toObjectErr(err, minioMetaBucket, fsMetaPath) } + go appendParts(fs.storage, bucket, object, uploadID) return newMD5Hex, nil } @@ -401,10 +508,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { result := ListPartsInfo{} - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath) + fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath) if err != nil { - return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath) + return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, fsMetaPath) } // Only parts with higher part numbers will be listed. partIdx := fsMeta.ObjectPartIndex(partNumberMarker) @@ -502,18 +609,18 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload // 1) no one aborts this multipart upload // 2) no one does a parallel complete-multipart-upload on this // multipart upload - nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + nsMutex.Lock(minioMetaBucket, uploadIDPath) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) if !fs.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } - // Read saved fs metadata for ongoing multipart. - fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } + // fs-append.json path + fsAppendMetaPath := getFSAppendMetaPath(uploadID) + // Lock fs-append.json so that no parallel appendParts() is being done. + nsMutex.Lock(minioMetaBucket, fsAppendMetaPath) + defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath) // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := completeMultipartMD5(parts...) @@ -521,66 +628,83 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", err } - tempObj := path.Join(tmpMetaPrefix, uploadID+"-"+"part.1") - - // Allocate staging buffer. - var buf = make([]byte, readSizeV1) + // Read saved fs metadata for ongoing multipart. + fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, fsMetaPath) + } - // Loop through all parts, validate them and then commit to disk. - for i, part := range parts { - partIdx := fsMeta.ObjectPartIndex(part.PartNumber) - if partIdx == -1 { - return "", InvalidPart{} - } - if fsMeta.Parts[partIdx].ETag != part.ETag { - return "", BadDigest{} + fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsAppendMetaPath) + if err == nil && isPartsSame(fsAppendMeta.Parts, parts) { + fsAppendDataPath := getFSAppendDataPath(uploadID) + if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil { + return "", toObjectErr(err, minioMetaBucket, fsAppendDataPath) } - // All parts except the last part has to be atleast 5MB. - if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) { - return "", PartTooSmall{ - PartNumber: part.PartNumber, - PartSize: fsMeta.Parts[partIdx].Size, - PartETag: part.ETag, + // Remove the append-file metadata file in tmp location as we no longer need it. + fs.storage.DeleteFile(minioMetaBucket, fsAppendMetaPath) + } else { + tempObj := path.Join(tmpMetaPrefix, uploadID+"-"+"part.1") + + // Allocate staging buffer. + var buf = make([]byte, readSizeV1) + + // Loop through all parts, validate them and then commit to disk. + for i, part := range parts { + partIdx := fsMeta.ObjectPartIndex(part.PartNumber) + if partIdx == -1 { + return "", InvalidPart{} } - } - // Construct part suffix. - partSuffix := fmt.Sprintf("object%d", part.PartNumber) - multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) - offset := int64(0) - totalLeft := fsMeta.Parts[partIdx].Size - for totalLeft > 0 { - curLeft := int64(readSizeV1) - if totalLeft < readSizeV1 { - curLeft = totalLeft + if fsMeta.Parts[partIdx].ETag != part.ETag { + return "", BadDigest{} } - var n int64 - n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf[:curLeft]) - if n > 0 { - if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil { - return "", toObjectErr(err, minioMetaBucket, tempObj) + // All parts except the last part has to be atleast 5MB. + if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) { + return "", PartTooSmall{ + PartNumber: part.PartNumber, + PartSize: fsMeta.Parts[partIdx].Size, + PartETag: part.ETag, } } - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - break + // Construct part suffix. + partSuffix := fmt.Sprintf("object%d", part.PartNumber) + multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) + offset := int64(0) + totalLeft := fsMeta.Parts[partIdx].Size + for totalLeft > 0 { + curLeft := int64(readSizeV1) + if totalLeft < readSizeV1 { + curLeft = totalLeft + } + var n int64 + n, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, offset, buf[:curLeft]) + if n > 0 { + if err = fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n]); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempObj) + } } - if err == errFileNotFound { - return "", InvalidPart{} + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err == errFileNotFound { + return "", InvalidPart{} + } + return "", toObjectErr(err, minioMetaBucket, multipartPartFile) } - return "", toObjectErr(err, minioMetaBucket, multipartPartFile) + offset += n + totalLeft -= n } - offset += n - totalLeft -= n } - } - // Rename the file back to original location, if not delete the temporary object. - err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) - if err != nil { - if dErr := fs.storage.DeleteFile(minioMetaBucket, tempObj); dErr != nil { - return "", toObjectErr(dErr, minioMetaBucket, tempObj) + // Rename the file back to original location, if not delete the temporary object. + err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) + if err != nil { + if dErr := fs.storage.DeleteFile(minioMetaBucket, tempObj); dErr != nil { + return "", toObjectErr(dErr, minioMetaBucket, tempObj) + } + return "", toObjectErr(err, bucket, object) } - return "", toObjectErr(err, bucket, object) } // No need to save part info, since we have concatenated all parts. @@ -593,13 +717,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } fsMeta.Meta["md5Sum"] = s3MD5 - // Write the metadata to a temp file and rename it to the actual location. - tmpMetaPath := path.Join(tmpMetaPrefix, getUUID()) - if err = writeFSMetadata(fs.storage, minioMetaBucket, tmpMetaPath, fsMeta); err != nil { - return "", toObjectErr(err, bucket, object) - } - fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile) - if err = fs.storage.RenameFile(minioMetaBucket, tmpMetaPath, minioMetaBucket, fsMetaPath); err != nil { + fsMetaPath = path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile) + if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { return "", toObjectErr(err, bucket, object) } } @@ -709,6 +828,11 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return InvalidUploadID{UploadID: uploadID} } + fsAppendMetaPath := getFSAppendMetaPath(uploadID) + // Lock fs-append.json so that no parallel appendParts() is being done. + nsMutex.Lock(minioMetaBucket, fsAppendMetaPath) + defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath) + err := fs.abortMultipartUpload(bucket, object, uploadID) return err } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 8553c9808..48d6ca819 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -324,7 +324,7 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } - fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object)) + fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)) if err != nil && err != errFileNotFound { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -460,13 +460,8 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io. fsMeta := newFSMetaV1() fsMeta.Meta = metadata - // Write the metadata to a temp file and rename it to the actual location. - tmpMetaPath := path.Join(tmpMetaPrefix, getUUID()) fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile) - if err = writeFSMetadata(fs.storage, minioMetaBucket, tmpMetaPath, fsMeta); err != nil { - return "", toObjectErr(err, bucket, object) - } - if err = fs.storage.RenameFile(minioMetaBucket, tmpMetaPath, minioMetaBucket, fsMetaPath); err != nil { + if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil { return "", toObjectErr(err, bucket, object) } } @@ -523,7 +518,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey if fileInfo, err = fs.storage.StatFile(bucket, entry); err != nil { return } - fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry)) + fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry, fsMetaJSONFile)) if mErr != nil && mErr != errFileNotFound { return FileInfo{}, mErr }