xl: Make locking more granular for PutObjectPart requests (#2168)

master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent ede4dd0f9c
commit bef72f26db
  1. 8
      xl-v1-healing.go
  2. 28
      xl-v1-metadata.go
  3. 10
      xl-v1-multipart-common.go
  4. 102
      xl-v1-multipart.go
  5. 6
      xl-v1-object.go

@ -47,12 +47,12 @@ func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int6
// Reads all `xl.json` metadata as a xlMetaV1 slice.
// Returns error slice indicating the failed metadata reads.
func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) {
errs := make([]error, len(xl.storageDisks))
metadataArray := make([]xlMetaV1, len(xl.storageDisks))
func readAllXLMetadata(disks []StorageAPI, bucket, object string) ([]xlMetaV1, []error) {
errs := make([]error, len(disks))
metadataArray := make([]xlMetaV1, len(disks))
var wg = &sync.WaitGroup{}
// Read `xl.json` parallelly across disks.
for index, disk := range xl.storageDisks {
for index, disk := range disks {
if disk == nil {
errs[index] = errDiskNotFound
continue

@ -272,10 +272,10 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er
}
// deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs.
func (xl xlObjects) deleteAllXLMetadata(bucket, prefix string, errs []error) {
func deleteAllXLMetadata(disks []StorageAPI, bucket, prefix string, errs []error) {
var wg = &sync.WaitGroup{}
// Delete all the `xl.json` left over.
for index, disk := range xl.storageDisks {
for index, disk := range disks {
if disk == nil {
continue
}
@ -293,12 +293,12 @@ func (xl xlObjects) deleteAllXLMetadata(bucket, prefix string, errs []error) {
}
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMetaV1) error {
func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, writeQuorum, readQuorum int) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
var mErrs = make([]error, len(disks))
// Start writing `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
continue
@ -323,14 +323,14 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet
wg.Wait()
// Do we have write quorum?.
if !isQuorum(mErrs, xl.writeQuorum) {
if !isQuorum(mErrs, writeQuorum) {
// Validate if we have read quorum.
if isQuorum(mErrs, xl.readQuorum) {
if isQuorum(mErrs, readQuorum) {
// Return success.
return nil
}
// Delete all `xl.json` successfully renamed.
xl.deleteAllXLMetadata(bucket, prefix, mErrs)
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
return errXLWriteQuorum
}
@ -346,12 +346,12 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet
}
// writeSameXLMetadata - write `xl.json` on all disks in order.
func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error {
func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
var mErrs = make([]error, len(disks))
// Start writing `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
continue
@ -376,14 +376,14 @@ func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1)
wg.Wait()
// Do we have write Quorum?.
if !isQuorum(mErrs, xl.writeQuorum) {
if !isQuorum(mErrs, writeQuorum) {
// Do we have readQuorum?.
if isQuorum(mErrs, xl.readQuorum) {
if isQuorum(mErrs, readQuorum) {
// Return success.
return nil
}
// Delete all `xl.json` successfully renamed.
xl.deleteAllXLMetadata(bucket, prefix, mErrs)
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
return errXLWriteQuorum
}

@ -261,16 +261,16 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
return FileInfo{}, err
}
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix.
func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error {
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum int) error {
var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(xl.storageDisks))
var mErrs = make([]error, len(disks))
srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile)
dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile)
// Rename `xl.json` to all disks in parallel.
for index, disk := range xl.storageDisks {
for index, disk := range disks {
if disk == nil {
mErrs[index] = errDiskNotFound
continue
@ -295,7 +295,7 @@ func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error {
wg.Wait()
// Do we have write quorum?.
if !isQuorum(mErrs, xl.writeQuorum) {
if !isQuorum(mErrs, writeQuorum) {
return errXLWriteQuorum
}
// For all other errors return.

@ -282,7 +282,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID)
tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID)
// Write updated `xl.json` to all disks.
if err = xl.writeSameXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil {
if err = writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
rErr := xl.renameObject(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath)
@ -335,37 +335,36 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
}
// Hold the lock and start the operation.
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
var partsMetadata []xlMetaV1
var errs []error
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
nsMutex.RLock(minioMetaBucket, uploadIDPath)
// Validates if upload ID exists.
if !xl.isUploadIDExists(bucket, object, uploadID) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
return "", InvalidUploadID{UploadID: uploadID}
}
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath)
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket,
uploadIDPath)
if !isQuorum(errs, xl.writeQuorum) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
// List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
onlineDisks, _, err := xl.listOnlineDisks(partsMetadata, errs)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++
}
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata)
// Need a unique name for the part being written in minioMetaBucket to
// accommodate concurrent PutObjectPart requests
partSuffix := fmt.Sprintf("part.%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix)
tmpSuffix := getUUID()
tmpPartPath := path.Join(tmpMetaPrefix, uploadID, tmpSuffix)
// Initialize md5 writer.
md5Writer := md5.New()
@ -419,6 +418,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
}
nsMutex.Lock(minioMetaBucket, uploadIDPath)
defer nsMutex.Unlock(minioMetaBucket, uploadIDPath)
// Validates if upload ID exists again.
if !xl.isUploadIDExists(bucket, object, uploadID) {
return "", InvalidUploadID{UploadID: uploadID}
@ -431,6 +433,60 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", toObjectErr(err, minioMetaBucket, partPath)
}
// Read metadata (again) associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket,
uploadIDPath)
if !isQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
}
var updatedEInfos []erasureInfo
for index := range partsMetadata {
updatedEInfos = append(updatedEInfos, partsMetadata[index].Erasure)
}
var checksums []checkSumInfo
for index, eInfo := range newEInfos {
if eInfo.IsValid() {
// Use a map to find union of checksums of parts that
// we concurrently written and committed before this
// part. N B For a different, concurrent upload of the
// same part, the last written content remains.
checksumSet := make(map[string]checkSumInfo)
checksums = newEInfos[index].Checksum
for _, cksum := range checksums {
checksumSet[cksum.Name] = cksum
}
checksums = updatedEInfos[index].Checksum
for _, cksum := range checksums {
checksumSet[cksum.Name] = cksum
}
// Form the checksumInfo to be committed in xl.json
// from the map.
var finalChecksums []checkSumInfo
for _, cksum := range checksumSet {
finalChecksums = append(finalChecksums, cksum)
}
updatedEInfos[index] = eInfo
updatedEInfos[index].Checksum = finalChecksums
}
}
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata)
// Get current highest version based on re-read partsMetadata.
_, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs)
if err != nil {
return "", toObjectErr(err, bucket, object)
}
// Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++
}
// Once part is successfully committed, proceed with updating XL metadata.
xlMeta.Stat.Version = higherVersion
@ -440,7 +496,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Update `xl.json` content for each disks.
for index := range partsMetadata {
partsMetadata[index].Parts = xlMeta.Parts
partsMetadata[index].Erasure = newEInfos[index]
partsMetadata[index].Erasure = updatedEInfos[index]
}
// Write all the checksum metadata.
@ -449,10 +505,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Writes a unique `xl.json` each disk carrying new checksum
// related information.
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempXLMetaPath, partsMetadata); err != nil {
if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath)
}
rErr := xl.commitXLMetadata(tempXLMetaPath, uploadIDPath)
rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
}
@ -593,7 +649,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID)
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath)
// Do we have writeQuorum?.
if !isQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
@ -675,10 +731,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Write unique `xl.json` for each disk.
if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil {
if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
}
rErr := xl.commitXLMetadata(tempUploadIDPath, uploadIDPath)
rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum)
if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
}

@ -61,7 +61,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
defer nsMutex.RUnlock(bucket, object)
// Read metadata associated with the object from all disks.
metaArr, errs := xl.readAllXLMetadata(bucket, object)
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
// Do we have read quorum?
if !isQuorum(errs, xl.readQuorum) {
return toObjectErr(errXLReadQuorum, bucket, object)
@ -378,7 +378,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks)
// Read metadata associated with the object from all disks.
partsMetadata, errs := xl.readAllXLMetadata(bucket, object)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
// Do we have write quroum?.
if !isQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object)
@ -523,7 +523,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
}
// Write unique `xl.json` for each disk.
if err = xl.writeUniqueXLMetadata(minioMetaTmpBucket, tempObj, partsMetadata); err != nil {
if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, bucket, object)
}

Loading…
Cancel
Save