From b466f27705a996a06b4a9276d6c8fce4dcbd9a38 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Mon, 30 May 2016 23:56:10 +0530 Subject: [PATCH] Nslock fixes (#1803) * XL/Multipart: Support parallel upload of parts by doing NS locking appropriately. * XL/Multipart: hold lock on the multipart upload while aborting. --- xl-v1-multipart.go | 53 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 5698be491..c8bd6c9f2 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -112,30 +112,35 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} } - // Hold write lock on the uploadID so that no one aborts it. + uploadIDLocked := false + defer func() { + if uploadIDLocked { + nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + } + }() + // Figure out the erasure distribution first. nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) - defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLocked = true if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } + // List all online disks. onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) if err != nil { return "", toObjectErr(err, bucket, object) } - // Increment version only if we have online disks less than configured storage disks. - if diskCount(onlineDisks) < len(xl.storageDisks) { - higherVersion++ - } - - xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } + // Unlock the uploadID so that parallel uploads of parts can happen. + nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLocked = false // Initialize a new erasure with online disks and new distribution. erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) @@ -179,6 +184,29 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s } } + // Hold lock as we are updating UPLODID/xl.json and renaming the part file from tmp location. + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + uploadIDLocked = true + + if !xl.isUploadIDExists(bucket, object, uploadID) { + return "", InvalidUploadID{UploadID: uploadID} + } + + // List all online disks. + onlineDisks, higherVersion, err = xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Increment version only if we have online disks less than configured storage disks. + if diskCount(onlineDisks) < len(xl.storageDisks) { + higherVersion++ + } + + xlMeta, err = xl.readXLMetadata(minioMetaBucket, uploadIDPath) + if err != nil { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } // Rename temporary part file to its final location. partPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partSuffix) err = xl.renameObject(minioMetaBucket, tmpPartPath, minioMetaBucket, partPath) @@ -190,7 +218,6 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s xlMeta.Stat.Version = higherVersion xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) - uploadIDPath = path.Join(mpartMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) @@ -475,6 +502,8 @@ func (xl xlObjects) abortMultipartUploadCommon(bucket, object, uploadID string) return toObjectErr(err, bucket, object) } + nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) + defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. uploadsJSON, err := readUploadsJSON(bucket, object, xl.storageDisks...)