Locking: move locking code from xl-erasure to xl-objects. (#1673)

Fixes #1639 #1649 #1650 #1624
master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent a0865122a7
commit 3c1ef3fee2
  1. 4
      object-api_test.go
  2. 18
      object-common-multipart.go
  3. 7
      xl-erasure-v1-createfile.go
  4. 4
      xl-erasure-v1-healfile.go
  5. 6
      xl-erasure-v1-readfile.go
  6. 29
      xl-erasure-v1.go
  7. 15
      xl-objects-multipart.go
  8. 20
      xl-objects.go

@ -29,6 +29,10 @@ var _ = Suite(&MySuite{})
func (s *MySuite) TestFSAPISuite(c *C) {
var storageList []string
// Initialize name space lock.
initNSLock()
create := func() ObjectLayer {
path, err := ioutil.TempDir(os.TempDir(), "minio-")
c.Check(err, IsNil)

@ -84,7 +84,9 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string)
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
}
// This lock needs to be held for any changes to the directory contents of ".minio/multipart/object/"
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
// Loops through until successfully generates a new unique upload id.
for {
uuid, err := uuid.New()
@ -146,6 +148,13 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
if !isUploadIDExists(storage, bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
// Hold read lock on the uploadID so that no one aborts it.
nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
// Hold write lock on the part so that there is no parallel upload on the part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(partID)))
partSuffix := fmt.Sprintf("%s.%.5d", uploadID, partID)
partSuffixPath := path.Join(tmpMetaPrefix, bucket, object, partSuffix)
@ -246,6 +255,10 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str
return InvalidUploadID{UploadID: uploadID}
}
// Hold lock so that there is no competing complete-multipart-upload or put-object-part.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
if err := cleanupUploadedParts(storage, bucket, object, uploadID); err != nil {
return err
}
@ -509,6 +522,9 @@ func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string,
if !isUploadIDExists(storage, bucket, object, uploadID) {
return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID}
}
// Hold lock so that there is no competing abort-multipart-upload or complete-multipart-upload.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
result := ListPartsInfo{}
entries, err := storage.ListDir(minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
if err != nil {

@ -53,10 +53,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
// Release the block writer upon function return.
defer wcloser.release()
// Lock right before reading from disk.
nsMutex.RLock(volume, path)
partsMetadata, errs := xl.getPartsMetadata(volume, path)
nsMutex.RUnlock(volume, path)
// Convert errs into meaningful err to be sent upwards if possible
// based on total number of errors and read quorum.
@ -240,10 +237,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
}
}
// Lock right before commit to disk.
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
// Close all writers and metadata writers in routines.
for index, writer := range writers {
if writer == nil {

@ -30,10 +30,6 @@ func (xl XL) healFile(volume string, path string) error {
var readers = make([]io.Reader, totalBlocks)
var writers = make([]io.WriteCloser, totalBlocks)
// Acquire a read lock.
nsMutex.RLock(volume, path)
defer nsMutex.RUnlock(volume, path)
// List all online disks to verify if we need to heal.
onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path)
if err != nil {

@ -34,10 +34,7 @@ func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, er
return nil, errInvalidArgument
}
// Acquire a read lock.
nsMutex.RLock(volume, path)
onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path)
nsMutex.RUnlock(volume, path)
if err != nil {
return nil, err
}
@ -51,8 +48,6 @@ func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, er
}()
}
// Acquire read lock again.
nsMutex.RLock(volume, path)
readers := make([]io.ReadCloser, len(xl.storageDisks))
for index, disk := range onlineDisks {
if disk == nil {
@ -67,7 +62,6 @@ func (xl XL) ReadFile(volume, path string, startOffset int64) (io.ReadCloser, er
readers[index] = reader
}
}
nsMutex.RUnlock(volume, path)
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()

@ -127,10 +127,6 @@ func (xl XL) MakeVol(volume string) error {
return errInvalidArgument
}
// Hold a write lock before creating a volume.
nsMutex.Lock(volume, "")
defer nsMutex.Unlock(volume, "")
// Err counters.
createVolErr := 0 // Count generic create vol errs.
volumeExistsErrCnt := 0 // Count all errVolumeExists errs.
@ -188,10 +184,6 @@ func (xl XL) DeleteVol(volume string) error {
return errInvalidArgument
}
// Hold a write lock for Delete volume.
nsMutex.Lock(volume, "")
defer nsMutex.Unlock(volume, "")
// Collect if all disks report volume not found.
var volumeNotFoundErrCnt int
@ -369,10 +361,6 @@ func (xl XL) listAllVolInfo(volume string) ([]VolInfo, bool, error) {
// healVolume - heals any missing volumes.
func (xl XL) healVolume(volume string) error {
// Acquire a read lock.
nsMutex.RLock(volume, "")
defer nsMutex.RUnlock(volume, "")
// Lists volume info for all online disks.
volsInfo, heal, err := xl.listAllVolInfo(volume)
if err != nil {
@ -420,10 +408,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
return VolInfo{}, errInvalidArgument
}
// Acquire a read lock before reading.
nsMutex.RLock(volume, "")
volsInfo, heal, err := xl.listAllVolInfo(volume)
nsMutex.RUnlock(volume, "")
if err != nil {
return VolInfo{}, err
}
@ -500,10 +485,7 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
return FileInfo{}, errInvalidArgument
}
// Acquire read lock.
nsMutex.RLock(volume, path)
_, metadata, heal, err := xl.listOnlineDisks(volume, path)
nsMutex.RUnlock(volume, path)
if err != nil {
return FileInfo{}, err
}
@ -535,9 +517,6 @@ func (xl XL) DeleteFile(volume, path string) error {
return errInvalidArgument
}
nsMutex.Lock(volume, path)
defer nsMutex.Unlock(volume, path)
errCount := 0
// Update meta data file and remove part file
for index, disk := range xl.storageDisks {
@ -590,14 +569,6 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
return errInvalidArgument
}
// Hold read lock at source before rename.
nsMutex.RLock(srcVolume, srcPath)
defer nsMutex.RUnlock(srcVolume, srcPath)
// Hold write lock at destination before rename.
nsMutex.Lock(dstVolume, dstPath)
defer nsMutex.Unlock(dstVolume, dstPath)
errCount := 0
for _, disk := range xl.storageDisks {
// Append "/" as srcPath and dstPath are either leaf-dirs or non-leaf-dris.

@ -133,6 +133,11 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if !isUploadIDExists(xl.storage, bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
}
// Hold lock so that
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this multipart upload
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object, uploadID))
// Calculate s3 compatible md5sum for complete multipart.
s3MD5, err := completeMultipartMD5(parts...)
@ -245,6 +250,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", toObjectErr(err, minioMetaBucket, uploadIDIncompletePath)
}
// Hold write lock on the destination before rename
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
// Delete if an object already exists.
// FIXME: rename it to tmp file and delete only after
// the newly uploaded file is renamed from tmp location to
@ -258,6 +267,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if err = xl.storage.RenameFile(minioMetaBucket, uploadIDPath, bucket, object); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Hold the lock so that two parallel complete-multipart-uploads do no
// leave a stale uploads.json behind.
nsMutex.Lock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object))
// Validate if there are other incomplete upload-id's present for
// the object, if yes do not attempt to delete 'uploads.json'.
var entries []string

@ -121,11 +121,15 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
// MakeBucket - make a bucket.
func (xl xlObjects) MakeBucket(bucket string) error {
nsMutex.Lock(bucket, "")
defer nsMutex.Unlock(bucket, "")
return makeBucket(xl.storage, bucket)
}
// GetBucketInfo - get bucket info.
func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) {
nsMutex.RLock(bucket, "")
defer nsMutex.RUnlock(bucket, "")
return getBucketInfo(xl.storage, bucket)
}
@ -136,6 +140,8 @@ func (xl xlObjects) ListBuckets() ([]BucketInfo, error) {
// DeleteBucket - delete a bucket.
func (xl xlObjects) DeleteBucket(bucket string) error {
nsMutex.Lock(bucket, "")
nsMutex.Unlock(bucket, "")
return deleteBucket(xl.storage, bucket)
}
@ -151,6 +157,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
if !IsValidObjectName(object) {
return nil, ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
if ok, err := isMultipartObject(xl.storage, bucket, object); err != nil {
return nil, toObjectErr(err, bucket, object)
} else if !ok {
@ -173,7 +181,13 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
// Hold a read lock once more which can be released after the following go-routine ends.
// We hold RLock once more because the current function would return before the go routine below
// executes and hence releasing the read lock (because of defer'ed nsMutex.RUnlock() call).
nsMutex.RLock(bucket, object)
go func() {
defer nsMutex.RUnlock(bucket, object)
for ; partIndex < len(info.Parts); partIndex++ {
part := info.Parts[partIndex]
r, err := xl.storage.ReadFile(bucket, pathJoin(object, partNumToPartFileName(part.PartNumber)), offset)
@ -263,6 +277,8 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.RLock(bucket, object)
defer nsMutex.RUnlock(bucket, object)
info, err := xl.getObjectInfo(bucket, object)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
@ -286,6 +302,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
Object: object,
}
}
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
tempObj := path.Join(tmpMetaPrefix, bucket, object)
fileWriter, err := xl.storage.CreateFile(minioMetaBucket, tempObj)
@ -468,6 +486,8 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
if err := xl.deleteObject(bucket, object); err != nil {
return toObjectErr(err, bucket, object)
}

Loading…
Cancel
Save