Fixed race during parallel PutObjectPart requests (#1775)

The race is between two parallel PutObjectPart requests updating partsInfo in xl.json.
Previously, it was being updated under a RLock().
master
Krishnan Parthasarathi 9 years ago committed by Harshavardhana
parent 5f679d9d1e
commit 302ec27fa2
  1. 17
      xl-v1-multipart.go

@ -114,23 +114,19 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
if !IsValidObjectName(object) { if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object} return "", ObjectNameInvalid{Bucket: bucket, Object: object}
} }
// Hold write lock on the uploadID so that no one aborts it.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if !xl.isUploadIDExists(bucket, object, uploadID) { if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID} return "", InvalidUploadID{UploadID: uploadID}
} }
// Hold read lock on the uploadID so that no one aborts it.
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
// Hold write lock on the part so that there is no parallel upload on the part. // Hold write lock on the part so that there is no parallel upload on the part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID))) defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// List all online disks. // List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID)) onlineDisks, higherVersion, err := xl.listOnlineDisks(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if err != nil { if err != nil {
@ -142,6 +138,11 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
higherVersion++ higherVersion++
} }
xlMeta, err := xl.readXLMetadata(minioMetaBucket, uploadIDPath)
if err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
// Initialize a new erasure with online disks and new distribution. // Initialize a new erasure with online disks and new distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution) erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)

Loading…
Cancel
Save