Improve namespace lock API: (#3203)

- abstract out instrumentation information.
- use separate lockInstance type that encapsulates the nsMutex, volume,
  path and opsID as the frontend or top-level lock object.
master
Aditya Manthramurthy 8 years ago committed by Harshavardhana
parent 3e67bfcc88
commit dd0698d14c
  1. 18
      cmd/bucket-notification-handlers.go
  2. 6
      cmd/bucket-policy-handlers.go
  3. 14
      cmd/control-lock-main_test.go
  4. 123
      cmd/fs-v1-multipart.go
  5. 23
      cmd/fs-v1.go
  6. 3
      cmd/lock-instrument_test.go
  7. 39
      cmd/namespace-lock.go
  8. 6
      cmd/namespace-lock_test.go
  9. 24
      cmd/xl-v1-bucket.go
  10. 16
      cmd/xl-v1-healing.go
  11. 8
      cmd/xl-v1-list-objects-heal.go
  12. 133
      cmd/xl-v1-multipart.go
  13. 33
      cmd/xl-v1-object.go

@ -166,10 +166,10 @@ func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI
// Acquire a write lock on bucket before modifying its
// configuration.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
// Release lock after notifying peers
defer nsMutex.Unlock(bucket, "", opsID)
defer bucketLock.Unlock()
// persist config to disk
err := persistNotificationConfig(bucket, ncfg, objAPI)
@ -374,10 +374,10 @@ func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectL
// Acquire a write lock on bucket before modifying its
// configuration.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
// Release lock after notifying peers
defer nsMutex.Unlock(bucket, "", opsID)
defer bucketLock.Unlock()
// update persistent config if dist XL
if globalIsDistXL {
@ -416,10 +416,10 @@ func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI Obje
// Acquire a write lock on bucket before modifying its
// configuration.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
// Release lock after notifying peers
defer nsMutex.Unlock(bucket, "", opsID)
defer bucketLock.Unlock()
// update persistent config if dist XL
if globalIsDistXL {

@ -209,10 +209,10 @@ func persistAndNotifyBucketPolicyChange(bucket string, pCh policyChange, objAPI
// Acquire a write lock on bucket before modifying its
// configuration.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
// Release lock after notifying peers
defer nsMutex.Unlock(bucket, "", opsID)
defer bucketLock.Unlock()
if pCh.IsRemove {
if err := removeBucketPolicy(bucket, objAPI); err != nil {

@ -23,12 +23,13 @@ import (
// Test print systemState.
func TestPrintLockState(t *testing.T) {
nsMutex.Lock("testbucket", "1.txt", "11-11")
testLock := nsMutex.NewNSLock("testbucket", "1.txt")
testLock.Lock()
sysLockState, err := getSystemLockState()
if err != nil {
t.Fatal(err)
}
nsMutex.Unlock("testbucket", "1.txt", "11-11")
testLock.Unlock()
sysLockStateMap := map[string]SystemLockState{}
sysLockStateMap["bucket"] = sysLockState
@ -66,7 +67,8 @@ func TestLockStateClear(t *testing.T) {
nsMutex.ForceUnlock(bucket, object)
}
nsMutex.Lock("testbucket", "1.txt", "11-11")
testLock := nsMutex.NewNSLock("testbucket", "1.txt")
testLock.Lock()
sysLockState, err := getSystemLockState()
if err != nil {
@ -111,7 +113,8 @@ func TestLockStateClear(t *testing.T) {
}
// Create another lock
nsMutex.RLock("testbucket", "blob.txt", "22-22")
blobLock := nsMutex.NewNSLock("testbucket", "blob.txt")
blobLock.RLock()
if sysLockState, err = getSystemLockState(); err != nil {
t.Fatal(err)
@ -142,7 +145,8 @@ func TestLockStateClear(t *testing.T) {
}
// Create yet another lock
nsMutex.RLock("testbucket", "exact.txt", "33-33")
exactLock := nsMutex.NewNSLock("testbucket", "exact.txt")
exactLock.RLock()
if sysLockState, err = getSystemLockState(); err != nil {
t.Fatal(err)

@ -60,11 +60,11 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var err error
var eof bool
if uploadIDMarker != "" {
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
keyMarkerLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, keyMarker))
keyMarkerLock.RLock()
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
keyMarkerLock.RUnlock()
if err != nil {
return ListMultipartsInfo{}, err
}
@ -115,12 +115,11 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var end bool
uploadIDMarker = ""
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
entryLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, entry))
entryLock.RLock()
tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage)
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
entryLock.RUnlock()
if err != nil {
return ListMultipartsInfo{}, err
}
@ -233,12 +232,12 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
fsMeta.Meta = meta
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// This lock needs to be held for any changes to the directory contents of ".minio.sys/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// This lock needs to be held for any changes to the directory
// contents of ".minio.sys/multipart/object/"
objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
uploadID = getUUID()
initiated := time.Now().UTC()
@ -301,7 +300,7 @@ func getFSAppendDataPath(uploadID string) string {
}
// Append parts to fsAppendDataFile.
func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) {
func appendParts(disk StorageAPI, bucket, object, uploadID string) {
cleanupAppendPaths := func() {
// In case of any error, cleanup the append data and json files
// from the tmp so that we do not have any inconsistent append
@ -316,16 +315,18 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) {
fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile)
// Lock the uploadID so that no one modifies fs.json
nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID)
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath)
uploadIDLock.RLock()
fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
uploadIDLock.RUnlock()
if err != nil {
return
}
// Lock fs-append.json so that there is no parallel append to the file.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID)
appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath)
appendPathLock.Lock()
defer appendPathLock.Unlock()
fsAppendMeta, err := readFSMetadata(disk, minioMetaBucket, fsAppendMetaPath)
if err != nil {
@ -344,8 +345,9 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) {
}
// Hold write lock on the part so that there is no parallel upload on the part.
partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number))
nsMutex.Lock(minioMetaBucket, partPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, partPath, opsID)
partPathLock := nsMutex.NewNSLock(minioMetaBucket, partPath)
partPathLock.Lock()
defer partPathLock.Unlock()
// Proceed to append "part"
fsAppendDataPath := getFSAppendDataPath(uploadID)
@ -386,7 +388,7 @@ func appendParts(disk StorageAPI, bucket, object, uploadID, opsID string) {
// If there are more parts that need to be appended to fsAppendDataFile
_, appendNeeded = partToAppend(fsMeta, fsAppendMeta)
if appendNeeded {
go appendParts(disk, bucket, object, uploadID, opsID)
go appendParts(disk, bucket, object, uploadID)
}
}
@ -409,13 +411,11 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID)
preUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath)
preUploadIDLock.RLock()
// Just check if the uploadID exists to avoid copy if it doesn't.
uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID)
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
preUploadIDLock.RUnlock()
if !uploadIDExists {
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
@ -490,14 +490,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
}
// get a random ID for lock instrumentation.
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
// Hold write lock as we are updating fs.json
nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
postUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath)
postUploadIDLock.Lock()
defer postUploadIDLock.Unlock()
// Just check if the uploadID exists to avoid copy if it doesn't.
if !fs.isUploadIDExists(bucket, object, uploadID) {
@ -520,7 +516,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil {
return "", toObjectErr(err, minioMetaBucket, uploadIDPath)
}
go appendParts(fs.storage, bucket, object, uploadID, opsID)
go appendParts(fs.storage, bucket, object, uploadID)
return newMD5Hex, nil
}
@ -595,12 +591,12 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
// Hold lock so that there is no competing
// abort-multipart-upload or complete-multipart-upload.
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !fs.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
@ -643,15 +639,14 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Hold lock so that
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath)
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", traceError(InvalidUploadID{UploadID: uploadID})
@ -660,8 +655,9 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// fs-append.json path
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// Lock fs-append.json so that no parallel appendParts() is being done.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID)
appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath)
appendPathLock.Lock()
defer appendPathLock.Unlock()
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := getCompleteMultipartMD5(parts...)
@ -788,13 +784,13 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, bucket, object)
}
// get a random ID for lock instrumentation.
opsID = getOpsID()
// Hold the lock so that two parallel complete-multipart-uploads do not
// leave a stale uploads.json behind.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// Hold the lock so that two parallel
// complete-multipart-uploads do not leave a stale
// uploads.json behind.
objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json
if err = fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
@ -848,12 +844,12 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
// Hold lock so that there is no competing
// complete-multipart-upload or put-object-part.
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !fs.isUploadIDExists(bucket, object, uploadID) {
return traceError(InvalidUploadID{UploadID: uploadID})
@ -861,8 +857,9 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
fsAppendMetaPath := getFSAppendMetaPath(uploadID)
// Lock fs-append.json so that no parallel appendParts() is being done.
nsMutex.Lock(minioMetaBucket, fsAppendMetaPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, fsAppendMetaPath, opsID)
appendPathLock := nsMutex.NewNSLock(minioMetaBucket, fsAppendMetaPath)
appendPathLock.Lock()
defer appendPathLock.Unlock()
err := fs.abortMultipartUpload(bucket, object, uploadID)
return err

@ -226,12 +226,10 @@ func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64,
return traceError(InvalidRange{offset, length, fi.Size})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before reading.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.RLock()
defer objectLock.RUnlock()
var totalLeft = length
bufSize := int64(readSizeV1)
@ -446,12 +444,10 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
}
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before committing the object.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.RLock()
defer objectLock.RUnlock()
// Entire object was written to the temp location, now it's safe to rename it to the actual location.
err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
@ -488,13 +484,12 @@ func (fs fsObjects) DeleteObject(bucket, object string) error {
if !IsValidObjectName(object) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before deleting so that an in progress GetObject does not return
// corrupt data or there is no race with a PutObject.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.RLock()
defer objectLock.RUnlock()
err := fs.storage.DeleteFile(minioMetaBucket, path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile))
if err != nil && err != errFileNotFound {

@ -256,7 +256,6 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
}{
// Test case - 1.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
@ -270,7 +269,6 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
// No entry for <volume, path> pair.
// So an attempt to change the state of the lock from `Blocked`->`Running` should fail.
{
volume: "my-bucket",
path: "my-object-2",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",
@ -307,7 +305,6 @@ func TestNsLockMapStatusBlockedToRunning(t *testing.T) {
// Test case - 5.
// Test case with write lock.
{
volume: "my-bucket",
path: "my-object",
lockOrigin: "/home/vadmeste/work/go/src/github.com/minio/minio/xl-v1-object.go:683 +0x2a",

@ -256,3 +256,42 @@ func (n *nsLockMap) ForceUnlock(volume, path string) {
}
}
}
// lockInstance - frontend/top-level interface for namespace locks.
type lockInstance struct {
n *nsLockMap
volume, path, opsID string
}
// NewNSLock - returns a lock instance for a given volume and
// path. The returned lockInstance object encapsulates the nsLockMap,
// volume, path and operation ID.
func (n *nsLockMap) NewNSLock(volume, path string) *lockInstance {
return &lockInstance{n, volume, path, getOpsID()}
}
// Lock - block until write lock is taken.
func (li *lockInstance) Lock() {
lockLocation := callerLocation()
readLock := false
li.n.lock(li.volume, li.path, lockLocation, li.opsID, readLock)
}
// Unlock - block until write lock is released.
func (li *lockInstance) Unlock() {
readLock := false
li.n.unlock(li.volume, li.path, li.opsID, readLock)
}
// RLock - block until read lock is taken.
func (li *lockInstance) RLock() {
lockLocation := callerLocation()
readLock := true
li.n.lock(li.volume, li.path, lockLocation, li.opsID, readLock)
}
// RUnlock - block until read lock is released.
func (li *lockInstance) RUnlock() {
readLock := true
li.n.unlock(li.volume, li.path, li.opsID, readLock)
}

@ -386,7 +386,8 @@ func TestLockStats(t *testing.T) {
func TestNamespaceForceUnlockTest(t *testing.T) {
// Create lock.
nsMutex.Lock("bucket", "object", "11-11")
lock := nsMutex.NewNSLock("bucket", "object")
lock.Lock()
// Forcefully unlock lock.
nsMutex.ForceUnlock("bucket", "object")
@ -394,7 +395,8 @@ func TestNamespaceForceUnlockTest(t *testing.T) {
go func() {
// Try to claim lock again.
nsMutex.Lock("bucket", "object", "22-22")
anotherLock := nsMutex.NewNSLock("bucket", "object")
anotherLock.Lock()
// And signal succes.
ch <- struct{}{}
}()

@ -31,11 +31,9 @@ func (xl xlObjects) MakeBucket(bucket string) error {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
defer bucketLock.Unlock()
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
@ -174,11 +172,11 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
if !IsValidBucketName(bucket) {
return BucketInfo{}, BucketNameInvalid{Bucket: bucket}
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.RLock(bucket, "", opsID)
defer nsMutex.RUnlock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.RLock()
defer bucketLock.RUnlock()
bucketInfo, err := xl.getBucketInfo(bucket)
if err != nil {
return BucketInfo{}, toObjectErr(err, bucket)
@ -249,11 +247,9 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
return BucketNameInvalid{Bucket: bucket}
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
defer bucketLock.Unlock()
// Collect if all disks report volume not found.
var wg = &sync.WaitGroup{}

@ -32,11 +32,9 @@ func (xl xlObjects) HealBucket(bucket string) error {
// Heal bucket - create buckets on disks where it does not exist.
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
bucketLock := nsMutex.NewNSLock(bucket, "")
bucketLock.Lock()
defer bucketLock.Unlock()
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
@ -101,12 +99,10 @@ func (xl xlObjects) HealObject(bucket, object string) error {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before healing.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.RLock()
defer objectLock.RUnlock()
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
if err := reduceErrs(errs, nil); err != nil {

@ -143,11 +143,9 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
continue
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Check if the current object needs healing
nsMutex.RLock(bucket, objInfo.Name, opsID)
objectLock := nsMutex.NewNSLock(bucket, objInfo.Name)
objectLock.RLock()
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name)
if xlShouldHeal(partsMetadata, errs) {
result.Objects = append(result.Objects, ObjectInfo{
@ -157,7 +155,7 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
IsDir: false,
})
}
nsMutex.RUnlock(bucket, objInfo.Name, opsID)
objectLock.RUnlock()
}
return result, nil
}

@ -64,10 +64,10 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// List all upload ids for the keyMarker starting from
// uploadIDMarker first.
if uploadIDMarker != "" {
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
// hold lock on keyMarker path
keyMarkerLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, keyMarker))
keyMarkerLock.RLock()
for _, disk := range xl.getLoadBalancedDisks() {
if disk == nil {
continue
@ -81,7 +81,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
break
}
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker), opsID)
keyMarkerLock.RUnlock()
if err != nil {
return ListMultipartsInfo{}, err
}
@ -133,11 +133,11 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
var end bool
uploadIDMarker = ""
// get a random ID for lock instrumentation.
opsID := getOpsID()
// For the new object entry we get all its pending uploadIDs.
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
// For the new object entry we get all its
// pending uploadIDs.
entryLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, entry))
entryLock.RLock()
var disk StorageAPI
for _, disk = range xl.getLoadBalancedDisks() {
if disk == nil {
@ -152,7 +152,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
break
}
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
entryLock.RUnlock()
if err != nil {
if isErrIgnored(err, xlTreeWalkIgnoredErrs) {
continue
@ -279,12 +279,12 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
xlMeta.Stat.ModTime = time.Now().UTC()
xlMeta.Meta = meta
// get a random ID for lock instrumentation.
opsID := getOpsID()
// This lock needs to be held for any changes to the directory contents of ".minio.sys/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// This lock needs to be held for any changes to the directory
// contents of ".minio.sys/multipart/object/"
objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
uploadID := getUUID()
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
@ -360,23 +360,22 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
var errs []error
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.RLock(minioMetaBucket, uploadIDPath, opsID)
// pre-check upload id lock.
preUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath)
preUploadIDLock.RLock()
// Validates if upload ID exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
preUploadIDLock.RUnlock()
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket,
uploadIDPath)
if !isDiskQuorum(errs, xl.writeQuorum) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
preUploadIDLock.RUnlock()
return "", toObjectErr(traceError(errXLWriteQuorum), bucket, object)
}
nsMutex.RUnlock(minioMetaBucket, uploadIDPath, opsID)
preUploadIDLock.RUnlock()
// List all online disks.
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
@ -467,13 +466,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
}
// get a random ID for lock instrumentation.
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID = getOpsID()
nsMutex.Lock(minioMetaBucket, uploadIDPath, opsID)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath, opsID)
// post-upload check (write) lock
postUploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath)
postUploadIDLock.Lock()
defer postUploadIDLock.Unlock()
// Validate again if upload ID still exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
@ -617,12 +613,12 @@ func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return ListPartsInfo{}, traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
// Hold lock so that there is no competing
// abort-multipart-upload or complete-multipart-upload.
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
@ -653,14 +649,16 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Hold lock so that
//
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this multipart upload
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
//
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", traceError(InvalidUploadID{UploadID: uploadID})
@ -772,24 +770,22 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
}
// get a random ID for lock instrumentation.
opsID = getOpsID()
// Hold write lock on the destination before rename.
nsMutex.Lock(bucket, object, opsID)
defer func(curOpsID string) {
destLock := nsMutex.NewNSLock(bucket, object)
destLock.Lock()
defer func() {
// A new complete multipart upload invalidates any
// previously cached object in memory.
xl.objCache.Delete(path.Join(bucket, object))
// This lock also protects the cache namespace.
nsMutex.Unlock(bucket, object, curOpsID)
destLock.Unlock()
// Prefetch the object from disk by triggering a fake GetObject call
// Unlike a regular single PutObject, multipart PutObject is comes in
// stages and it is harder to cache.
go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard)
}(opsID)
}()
// Rename if an object already exists to temporary location.
uniqueID := getUUID()
@ -824,13 +820,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Delete the previously successfully renamed object.
xl.deleteObject(minioMetaBucket, path.Join(tmpMetaPrefix, uniqueID))
// get a random ID for lock instrumentation.
opsID = getOpsID()
// Hold the lock so that two parallel complete-multipart-uploads do not
// leave a stale uploads.json behind.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// Hold the lock so that two parallel
// complete-multipart-uploads do not leave a stale
// uploads.json behind.
objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json with quorum
if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
@ -851,11 +847,12 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e
return toObjectErr(err, bucket, object)
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object), opsID)
// hold lock so we don't compete with a complete, or abort
// multipart request.
objectMPartPathLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json with quorum
if err = xl.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil {
@ -889,12 +886,12 @@ func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID), opsID)
// Hold lock so that there is no competing
// complete-multipart-upload or put-object-part.
uploadIDLock := nsMutex.NewNSLock(minioMetaBucket,
pathJoin(mpartMetaPrefix, bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !xl.isUploadIDExists(bucket, object, uploadID) {
return traceError(InvalidUploadID{UploadID: uploadID})

@ -59,12 +59,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return traceError(errUnexpected)
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
// Lock the object before reading.
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.RLock()
defer objectLock.RUnlock()
// Read metadata associated with the object from all disks.
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
@ -227,11 +225,10 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.RLock()
defer objectLock.RUnlock()
nsMutex.RLock(bucket, object, opsID)
defer nsMutex.RUnlock(bucket, object, opsID)
info, err := xl.getObjectInfo(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
@ -508,14 +505,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
}
// get a random ID for lock instrumentation.
// generates random string on setting MINIO_DEBUG=lock, else returns empty string.
// used for instrumentation on locks.
opsID := getOpsID()
// Lock the object.
nsMutex.Lock(bucket, object, opsID)
defer nsMutex.Unlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.Lock()
defer objectLock.Unlock()
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
@ -637,11 +630,9 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, object, opsID)
defer nsMutex.Unlock(bucket, object, opsID)
objectLock := nsMutex.NewNSLock(bucket, object)
objectLock.Lock()
defer objectLock.Unlock()
// Validate object exists.
if !xl.isObject(bucket, object) {

Loading…
Cancel
Save