FS/multipart: Append the parts to a file as and when the parts arrive. (#2513)

master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent 3118195e92
commit 3aa0574c6b
  1. 36
      cmd/fs-v1-metadata.go
  2. 206
      cmd/fs-v1-multipart.go
  3. 11
      cmd/fs-v1.go

@ -77,9 +77,9 @@ func (m *fsMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
}
// readFSMetadata - returns the object metadata `fs.json` content.
func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, err error) {
func readFSMetadata(disk StorageAPI, bucket, filePath string) (fsMeta fsMetaV1, err error) {
// Read all `fs.json`.
buf, err := disk.ReadAll(bucket, path.Join(object, fsMetaJSONFile))
buf, err := disk.ReadAll(bucket, filePath)
if err != nil {
return fsMetaV1{}, err
}
@ -93,6 +93,19 @@ func readFSMetadata(disk StorageAPI, bucket, object string) (fsMeta fsMetaV1, er
return fsMeta, nil
}
// Write fsMeta to fs.json or fs-append.json.
func writeFSMetadata(disk StorageAPI, bucket, filePath string, fsMeta fsMetaV1) (err error) {
tmpPath := path.Join(tmpMetaPrefix, getUUID())
metadataBytes, err := json.Marshal(fsMeta)
if err != nil {
return err
}
if err = disk.AppendFile(minioMetaBucket, tmpPath, metadataBytes); err != nil {
return err
}
return disk.RenameFile(minioMetaBucket, tmpPath, bucket, filePath)
}
// newFSMetaV1 - initializes new fsMetaV1.
func newFSMetaV1() (fsMeta fsMetaV1) {
fsMeta = fsMetaV1{}
@ -131,17 +144,18 @@ func writeFSFormatData(storage StorageAPI, fsFormat formatConfigV1) error {
return nil
}
// writeFSMetadata - writes `fs.json` metadata, marshals fsMeta object into json
// and saves it to disk.
func writeFSMetadata(storage StorageAPI, bucket, path string, fsMeta fsMetaV1) error {
metadataBytes, err := json.Marshal(fsMeta)
if err != nil {
return err
// Return if the part info in uploadedParts and completeParts are same.
func isPartsSame(uploadedParts []objectPartInfo, completeParts []completePart) bool {
if len(uploadedParts) != len(completeParts) {
return false
}
if err = storage.AppendFile(bucket, path, metadataBytes); err != nil {
return err
for i := range completeParts {
if uploadedParts[i].Number != completeParts[i].PartNumber ||
uploadedParts[i].ETag != completeParts[i].ETag {
return false
}
return nil
}
return true
}
var extendedHeaders = []string{

@ -235,17 +235,9 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
if err = fs.writeUploadJSON(bucket, object, uploadID, initiated); err != nil {
return "", err
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempFSMetadataPath := path.Join(tmpMetaPrefix, getUUID()+"-"+fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempFSMetadataPath)
}
err = fs.storage.RenameFile(minioMetaBucket, tempFSMetadataPath, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, tempFSMetadataPath); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tempFSMetadataPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
}
// Return success.
return uploadID, nil
@ -272,6 +264,127 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st
return fs.newMultipartUpload(bucket, object, meta)
}
// Returns if a new part can be appended to fsAppendDataFile.
func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, appendNeeded bool) {
if len(fsMeta.Parts) == 0 {
return
}
// As fsAppendMeta.Parts will be sorted len(fsAppendMeta.Parts) will naturally be the next part number
nextPartNum := len(fsAppendMeta.Parts) + 1
nextPartIndex := fsMeta.ObjectPartIndex(nextPartNum)
if nextPartIndex == -1 {
return
}
return fsMeta.Parts[nextPartIndex], true
}
// Returns metadata path for the file holding info about the parts that
// have been appended to the "append-file"
func getFSAppendMetaPath(uploadID string) string {
return path.Join(tmpMetaPrefix, uploadID+".json")
}
// Returns path for the append-file.
func getFSAppendDataPath(uploadID string) string {
return path.Join(tmpMetaPrefix, uploadID+".data")
}
// Append parts to fsAppendDataFile.
func appendParts(disk StorageAPI, bucket, object, uploadID string) {
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// fs-append.json path
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// fs.json path
fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile)
// Lock the uploadID so that no one modifies fs.json
nsMutex.RLock(minioMetaBucket, uploadIDPath)
fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
if err != nil {
return
}
// Lock fs-append.json so that there is no parallel append to the file.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath)
fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath)
if err != nil {
if err != errFileNotFound {
return
}
fsAppendMeta = fsMeta
fsAppendMeta.Parts = nil
}
// Check if a part needs to be appended to
part, appendNeeded := partToAppend(fsMeta, fsAppendMeta)
if !appendNeeded {
return
}
// Hold write lock on the part so that there is no parallel upload on the part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)))
// Proceed to append "part"
fsAppendDataPath := getFSAppendDataPath(uploadID)
tmpDataPath := path.Join(tmpMetaPrefix, getUUID())
if part.Number != 1 {
// Move it to tmp location before appending so that we don't leave inconsitent data
// if server crashes during append operation.
err = disk.RenameFile(minioMetaBucket, fsAppendDataPath, minioMetaBucket, tmpDataPath)
if err != nil {
return
}
// Delete fs-append.json so that we don't leave a stale file if server crashes
// when the part is being appended to the tmp file.
err = disk.DeleteFile(minioMetaBucket, fsAppendMetaPath)
if err != nil {
return
}
}
// Path to the part that needs to be appended.
partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name)
offset := int64(0)
totalLeft := part.Size
buf := make([]byte, readSizeV1)
for totalLeft > 0 {
curLeft := int64(readSizeV1)
if totalLeft < readSizeV1 {
curLeft = totalLeft
}
var n int64
n, err = disk.ReadFile(minioMetaBucket, partPath, offset, buf[:curLeft])
if n > 0 {
if err = disk.AppendFile(minioMetaBucket, tmpDataPath, buf[:n]); err != nil {
return
}
}
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
return
}
offset += n
totalLeft -= n
}
// All good, the part has been appended to the tmp file, rename it back.
if err = disk.RenameFile(minioMetaBucket, tmpDataPath, minioMetaBucket, fsAppendDataPath); err != nil {
return
}
fsAppendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size)
if err = writeFSMetadata(disk, minioMetaBucket, fsAppendMetaPath, fsAppendMeta); err != nil {
return
}
// If there are more parts that need to be appended to fsAppendDataFile
_, appendNeeded = partToAppend(fsMeta, fsAppendMeta)
if appendNeeded {
go appendParts(disk, bucket, object, uploadID)
}
}
// PutObjectPart - reads incoming data until EOF for the part file on
// an ongoing multipart transaction. Internally incoming data is
// written to '.minio/tmp' location and safely renamed to
@ -366,9 +479,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", InvalidUploadID{UploadID: uploadID}
}
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath)
fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
}
fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
@ -380,18 +494,11 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
return "", toObjectErr(err, minioMetaBucket, partPath)
}
uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempFSMetadataPath := path.Join(tmpMetaPrefix, getUUID()+"-"+fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, tempFSMetadataPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempFSMetadataPath)
}
err = fs.storage.RenameFile(minioMetaBucket, tempFSMetadataPath, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile))
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaBucket, tempFSMetadataPath); dErr != nil {
return "", toObjectErr(dErr, minioMetaBucket, tempFSMetadataPath)
}
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
}
go appendParts(fs.storage, bucket, object, uploadID)
return newMD5Hex, nil
}
@ -401,10 +508,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
result := ListPartsInfo{}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath)
fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath)
if err != nil {
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, uploadIDPath)
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, fsMetaPath)
}
// Only parts with higher part numbers will be listed.
partIdx := fsMeta.ObjectPartIndex(partNumberMarker)
@ -502,18 +609,18 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
// Read saved fs metadata for ongoing multipart.
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// fs-append.json path
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// Lock fs-append.json so that no parallel appendParts() is being done.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath)
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
@ -521,6 +628,22 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", err
}
// Read saved fs metadata for ongoing multipart.
fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, fsMetaPath)
}
fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsAppendMetaPath)
if err == nil && isPartsSame(fsAppendMeta.Parts, parts) {
fsAppendDataPath := getFSAppendDataPath(uploadID)
if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil {
return "", toObjectErr(err, minioMetaBucket, fsAppendDataPath)
}
// Remove the append-file metadata file in tmp location as we no longer need it.
fs.storage.DeleteFile(minioMetaBucket, fsAppendMetaPath)
} else {
tempObj := path.Join(tmpMetaPrefix, uploadID+"-"+"part.1")
// Allocate staging buffer.
@ -582,6 +705,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
return "", toObjectErr(err, bucket, object)
}
}
// No need to save part info, since we have concatenated all parts.
fsMeta.Parts = nil
@ -593,13 +717,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
fsMeta.Meta["md5Sum"] = s3MD5
// Write the metadata to a temp file and rename it to the actual location.
tmpMetaPath := path.Join(tmpMetaPrefix, getUUID())
if err = writeFSMetadata(fs.storage, minioMetaBucket, tmpMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if err = fs.storage.RenameFile(minioMetaBucket, tmpMetaPath, minioMetaBucket, fsMetaPath); err != nil {
fsMetaPath = path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
}
@ -709,6 +828,11 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return InvalidUploadID{UploadID: uploadID}
}
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// Lock fs-append.json so that no parallel appendParts() is being done.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath)
err := fs.abortMultipartUpload(bucket, object, uploadID)
return err
}

@ -324,7 +324,7 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object))
fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile))
if err != nil && err != errFileNotFound {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
@ -460,13 +460,8 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
fsMeta := newFSMetaV1()
fsMeta.Meta = metadata
// Write the metadata to a temp file and rename it to the actual location.
tmpMetaPath := path.Join(tmpMetaPrefix, getUUID())
fsMetaPath := path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
if err = writeFSMetadata(fs.storage, minioMetaBucket, tmpMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
if err = fs.storage.RenameFile(minioMetaBucket, tmpMetaPath, minioMetaBucket, fsMetaPath); err != nil {
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
return "", toObjectErr(err, bucket, object)
}
}
@ -523,7 +518,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
if fileInfo, err = fs.storage.StatFile(bucket, entry); err != nil {
return
}
fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry))
fsMeta, mErr := readFSMetadata(fs.storage, minioMetaBucket, path.Join(bucketMetaPrefix, bucket, entry, fsMetaJSONFile))
if mErr != nil && mErr != errFileNotFound {
return FileInfo{}, mErr
}

Loading…
Cancel
Save