diff --git a/xl-v1-healing.go b/xl-v1-healing.go index fa2b8ccf8..a0b5dc3a1 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -47,12 +47,12 @@ func listObjectVersions(partsMetadata []xlMetaV1, errs []error) (versions []int6 // Reads all `xl.json` metadata as a xlMetaV1 slice. // Returns error slice indicating the failed metadata reads. -func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []error) { - errs := make([]error, len(xl.storageDisks)) - metadataArray := make([]xlMetaV1, len(xl.storageDisks)) +func readAllXLMetadata(disks []StorageAPI, bucket, object string) ([]xlMetaV1, []error) { + errs := make([]error, len(disks)) + metadataArray := make([]xlMetaV1, len(disks)) var wg = &sync.WaitGroup{} // Read `xl.json` parallelly across disks. - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { errs[index] = errDiskNotFound continue diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index c6e10b819..b439ac3ff 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -272,10 +272,10 @@ func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) er } // deleteAllXLMetadata - deletes all partially written `xl.json` depending on errs. -func (xl xlObjects) deleteAllXLMetadata(bucket, prefix string, errs []error) { +func deleteAllXLMetadata(disks []StorageAPI, bucket, prefix string, errs []error) { var wg = &sync.WaitGroup{} // Delete all the `xl.json` left over. - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { continue } @@ -293,12 +293,12 @@ func (xl xlObjects) deleteAllXLMetadata(bucket, prefix string, errs []error) { } // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. -func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMetaV1) error { +func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, writeQuorum, readQuorum int) error { var wg = &sync.WaitGroup{} - var mErrs = make([]error, len(xl.storageDisks)) + var mErrs = make([]error, len(disks)) // Start writing `xl.json` to all disks in parallel. - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { mErrs[index] = errDiskNotFound continue @@ -323,14 +323,14 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet wg.Wait() // Do we have write quorum?. - if !isQuorum(mErrs, xl.writeQuorum) { + if !isQuorum(mErrs, writeQuorum) { // Validate if we have read quorum. - if isQuorum(mErrs, xl.readQuorum) { + if isQuorum(mErrs, readQuorum) { // Return success. return nil } // Delete all `xl.json` successfully renamed. - xl.deleteAllXLMetadata(bucket, prefix, mErrs) + deleteAllXLMetadata(disks, bucket, prefix, mErrs) return errXLWriteQuorum } @@ -346,12 +346,12 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet } // writeSameXLMetadata - write `xl.json` on all disks in order. -func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { +func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) error { var wg = &sync.WaitGroup{} - var mErrs = make([]error, len(xl.storageDisks)) + var mErrs = make([]error, len(disks)) // Start writing `xl.json` to all disks in parallel. - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { mErrs[index] = errDiskNotFound continue @@ -376,14 +376,14 @@ func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1) wg.Wait() // Do we have write Quorum?. - if !isQuorum(mErrs, xl.writeQuorum) { + if !isQuorum(mErrs, writeQuorum) { // Do we have readQuorum?. - if isQuorum(mErrs, xl.readQuorum) { + if isQuorum(mErrs, readQuorum) { // Return success. return nil } // Delete all `xl.json` successfully renamed. - xl.deleteAllXLMetadata(bucket, prefix, mErrs) + deleteAllXLMetadata(disks, bucket, prefix, mErrs) return errXLWriteQuorum } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index 18b2f731f..55a51bab4 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -261,16 +261,16 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf return FileInfo{}, err } -// commitXLMetadata - commit `xl.json` from source prefix to destination prefix. -func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error { +// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. +func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum int) error { var wg = &sync.WaitGroup{} - var mErrs = make([]error, len(xl.storageDisks)) + var mErrs = make([]error, len(disks)) srcJSONFile := path.Join(srcPrefix, xlMetaJSONFile) dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile) // Rename `xl.json` to all disks in parallel. - for index, disk := range xl.storageDisks { + for index, disk := range disks { if disk == nil { mErrs[index] = errDiskNotFound continue @@ -295,7 +295,7 @@ func (xl xlObjects) commitXLMetadata(srcPrefix, dstPrefix string) error { wg.Wait() // Do we have write quorum?. - if !isQuorum(mErrs, xl.writeQuorum) { + if !isQuorum(mErrs, writeQuorum) { return errXLWriteQuorum } // For all other errors return. diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index 7c2116200..04028c303 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -282,7 +282,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) // Write updated `xl.json` to all disks. - if err = xl.writeSameXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { + if err = writeSameXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } rErr := xl.renameObject(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) @@ -335,37 +335,36 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if !IsValidObjectName(object) { return "", ObjectNameInvalid{Bucket: bucket, Object: object} } - // Hold the lock and start the operation. - uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) - nsMutex.Lock(minioMetaBucket, uploadIDPath) - defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) + var partsMetadata []xlMetaV1 + var errs []error + uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) + nsMutex.RLock(minioMetaBucket, uploadIDPath) + // Validates if upload ID exists. if !xl.isUploadIDExists(bucket, object, uploadID) { + nsMutex.RUnlock(minioMetaBucket, uploadIDPath) return "", InvalidUploadID{UploadID: uploadID} } - // Read metadata associated with the object from all disks. - partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath) + partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket, + uploadIDPath) if !isQuorum(errs, xl.writeQuorum) { + nsMutex.RUnlock(minioMetaBucket, uploadIDPath) return "", toObjectErr(errXLWriteQuorum, bucket, object) } + nsMutex.RUnlock(minioMetaBucket, uploadIDPath) // List all online disks. - onlineDisks, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs) + onlineDisks, _, err := xl.listOnlineDisks(partsMetadata, errs) if err != nil { return "", toObjectErr(err, bucket, object) } - // Increment version only if we have online disks less than configured storage disks. - if diskCount(onlineDisks) < len(xl.storageDisks) { - higherVersion++ - } - - // Pick one from the first valid metadata. - xlMeta := pickValidXLMeta(partsMetadata) - + // Need a unique name for the part being written in minioMetaBucket to + // accommodate concurrent PutObjectPart requests partSuffix := fmt.Sprintf("part.%d", partID) - tmpPartPath := path.Join(tmpMetaPrefix, uploadID, partSuffix) + tmpSuffix := getUUID() + tmpPartPath := path.Join(tmpMetaPrefix, uploadID, tmpSuffix) // Initialize md5 writer. md5Writer := md5.New() @@ -419,6 +418,9 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s } } + nsMutex.Lock(minioMetaBucket, uploadIDPath) + defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) + // Validates if upload ID exists again. if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} @@ -431,6 +433,60 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s return "", toObjectErr(err, minioMetaBucket, partPath) } + // Read metadata (again) associated with the object from all disks. + partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, + uploadIDPath) + if !isQuorum(errs, xl.writeQuorum) { + return "", toObjectErr(errXLWriteQuorum, bucket, object) + } + + var updatedEInfos []erasureInfo + for index := range partsMetadata { + updatedEInfos = append(updatedEInfos, partsMetadata[index].Erasure) + } + + var checksums []checkSumInfo + for index, eInfo := range newEInfos { + if eInfo.IsValid() { + + // Use a map to find union of checksums of parts that + // we concurrently written and committed before this + // part. N B For a different, concurrent upload of the + // same part, the last written content remains. + checksumSet := make(map[string]checkSumInfo) + checksums = newEInfos[index].Checksum + for _, cksum := range checksums { + checksumSet[cksum.Name] = cksum + } + checksums = updatedEInfos[index].Checksum + for _, cksum := range checksums { + checksumSet[cksum.Name] = cksum + } + // Form the checksumInfo to be committed in xl.json + // from the map. + var finalChecksums []checkSumInfo + for _, cksum := range checksumSet { + finalChecksums = append(finalChecksums, cksum) + } + updatedEInfos[index] = eInfo + updatedEInfos[index].Checksum = finalChecksums + } + } + + // Pick one from the first valid metadata. + xlMeta := pickValidXLMeta(partsMetadata) + + // Get current highest version based on re-read partsMetadata. + _, higherVersion, err := xl.listOnlineDisks(partsMetadata, errs) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Increment version only if we have online disks less than configured storage disks. + if diskCount(onlineDisks) < len(xl.storageDisks) { + higherVersion++ + } + // Once part is successfully committed, proceed with updating XL metadata. xlMeta.Stat.Version = higherVersion @@ -440,7 +496,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Update `xl.json` content for each disks. for index := range partsMetadata { partsMetadata[index].Parts = xlMeta.Parts - partsMetadata[index].Erasure = newEInfos[index] + partsMetadata[index].Erasure = updatedEInfos[index] } // Write all the checksum metadata. @@ -449,10 +505,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Writes a unique `xl.json` each disk carrying new checksum // related information. - if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempXLMetaPath, partsMetadata); err != nil { + if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath) } - rErr := xl.commitXLMetadata(tempXLMetaPath, uploadIDPath) + rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } @@ -593,7 +649,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload uploadIDPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID) // Read metadata associated with the object from all disks. - partsMetadata, errs := xl.readAllXLMetadata(minioMetaBucket, uploadIDPath) + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) // Do we have writeQuorum?. if !isQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) @@ -675,10 +731,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Write unique `xl.json` for each disk. - if err = xl.writeUniqueXLMetadata(minioMetaBucket, tempUploadIDPath, partsMetadata); err != nil { + if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } - rErr := xl.commitXLMetadata(tempUploadIDPath, uploadIDPath) + rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum) if rErr != nil { return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) } diff --git a/xl-v1-object.go b/xl-v1-object.go index 80fb98a5b..e5cd1fc19 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -61,7 +61,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i defer nsMutex.RUnlock(bucket, object) // Read metadata associated with the object from all disks. - metaArr, errs := xl.readAllXLMetadata(bucket, object) + metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have read quorum? if !isQuorum(errs, xl.readQuorum) { return toObjectErr(errXLReadQuorum, bucket, object) @@ -378,7 +378,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) // Read metadata associated with the object from all disks. - partsMetadata, errs := xl.readAllXLMetadata(bucket, object) + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) // Do we have write quroum?. if !isQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) @@ -523,7 +523,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Write unique `xl.json` for each disk. - if err = xl.writeUniqueXLMetadata(minioMetaTmpBucket, tempObj, partsMetadata); err != nil { + if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { return "", toObjectErr(err, bucket, object) }