XL: isQuorum rename as isDiskQuorum, word it properly. (#2196)

master
Harshavardhana 8 years ago committed by Anand Babu (AB) Periasamy
parent 3b69b4ada4
commit dc3bafb194
  1. 2
      erasure-createfile.go
  2. 2
      xl-v1-bucket.go
  3. 8
      xl-v1-metadata.go
  4. 7
      xl-v1-multipart-common.go
  5. 6
      xl-v1-multipart.go
  6. 14
      xl-v1-object.go
  7. 14
      xl-v1-utils.go

@ -160,7 +160,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist
wg.Wait() wg.Wait()
// Do we have write quorum?. // Do we have write quorum?.
if !isQuorum(wErrs, writeQuorum) { if !isDiskQuorum(wErrs, writeQuorum) {
return toObjectErr(errXLWriteQuorum, volume, path) return toObjectErr(errXLWriteQuorum, volume, path)
} }
return nil return nil

@ -61,7 +61,7 @@ func (xl xlObjects) MakeBucket(bucket string) error {
wg.Wait() wg.Wait()
// Do we have write quorum?. // Do we have write quorum?.
if !isQuorum(dErrs, xl.writeQuorum) { if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Purge successfully created buckets if we don't have writeQuorum. // Purge successfully created buckets if we don't have writeQuorum.
xl.undoMakeBucket(bucket) xl.undoMakeBucket(bucket)
return toObjectErr(errXLWriteQuorum, bucket) return toObjectErr(errXLWriteQuorum, bucket)

@ -322,9 +322,9 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
wg.Wait() wg.Wait()
// Do we have write quorum?. // Do we have write quorum?.
if !isQuorum(mErrs, writeQuorum) { if !isDiskQuorum(mErrs, writeQuorum) {
// Validate if we have read quorum. // Validate if we have read quorum.
if isQuorum(mErrs, readQuorum) { if isDiskQuorum(mErrs, readQuorum) {
// Return success. // Return success.
return nil return nil
} }
@ -375,9 +375,9 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
wg.Wait() wg.Wait()
// Do we have write Quorum?. // Do we have write Quorum?.
if !isQuorum(mErrs, writeQuorum) { if !isDiskQuorum(mErrs, writeQuorum) {
// Do we have readQuorum?. // Do we have readQuorum?.
if isQuorum(mErrs, readQuorum) { if isDiskQuorum(mErrs, readQuorum) {
// Return success. // Return success.
return nil return nil
} }

@ -62,7 +62,7 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads
wg.Wait() wg.Wait()
// Count all the errors and validate if we have write quorum. // Count all the errors and validate if we have write quorum.
if !isQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
// Rename `uploads.json` left over back to tmp location. // Rename `uploads.json` left over back to tmp location.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
@ -149,7 +149,7 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
wg.Wait() wg.Wait()
// Count all the errors and validate if we have write quorum. // Count all the errors and validate if we have write quorum.
if !isQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
// Rename `uploads.json` left over back to tmp location. // Rename `uploads.json` left over back to tmp location.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
@ -295,9 +295,10 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuor
wg.Wait() wg.Wait()
// Do we have write quorum?. // Do we have write quorum?.
if !isQuorum(mErrs, writeQuorum) { if !isDiskQuorum(mErrs, writeQuorum) {
return errXLWriteQuorum return errXLWriteQuorum
} }
// For all other errors return. // For all other errors return.
for _, err := range mErrs { for _, err := range mErrs {
if err != nil && err != errDiskNotFound { if err != nil && err != errDiskNotFound {

@ -347,7 +347,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket, partsMetadata, errs = readAllXLMetadata(xl.storageDisks, minioMetaBucket,
uploadIDPath) uploadIDPath)
if !isQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
nsMutex.RUnlock(minioMetaBucket, uploadIDPath) nsMutex.RUnlock(minioMetaBucket, uploadIDPath)
return "", toObjectErr(errXLWriteQuorum, bucket, object) return "", toObjectErr(errXLWriteQuorum, bucket, object)
} }
@ -435,7 +435,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Read metadata (again) associated with the object from all disks. // Read metadata (again) associated with the object from all disks.
partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath) partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath)
if !isQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object) return "", toObjectErr(errXLWriteQuorum, bucket, object)
} }
@ -626,7 +626,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath) partsMetadata, errs := readAllXLMetadata(xl.storageDisks, minioMetaBucket, uploadIDPath)
// Do we have writeQuorum?. // Do we have writeQuorum?.
if !isQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object) return "", toObjectErr(errXLWriteQuorum, bucket, object)
} }

@ -63,7 +63,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
// Do we have read quorum? // Do we have read quorum?
if !isQuorum(errs, xl.readQuorum) { if !isDiskQuorum(errs, xl.readQuorum) {
return toObjectErr(errXLReadQuorum, bucket, object) return toObjectErr(errXLReadQuorum, bucket, object)
} }
@ -302,9 +302,9 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
// We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure. Cleanup successful renames. // otherwise return failure. Cleanup successful renames.
if !isQuorum(errs, writeQuorum) { if !isDiskQuorum(errs, writeQuorum) {
// Check we have successful read quorum. // Check we have successful read quorum.
if isQuorum(errs, readQuorum) { if isDiskQuorum(errs, readQuorum) {
return nil // Return success. return nil // Return success.
} // else - failed to acquire read quorum. } // else - failed to acquire read quorum.
// Undo all the partial rename operations. // Undo all the partial rename operations.
@ -378,7 +378,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
// Do we have write quroum?. // Do we have write quroum?.
if !isQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
return "", toObjectErr(errXLWriteQuorum, bucket, object) return "", toObjectErr(errXLWriteQuorum, bucket, object)
} }
@ -564,9 +564,9 @@ func (xl xlObjects) deleteObject(bucket, object string) error {
// Wait for all routines to finish. // Wait for all routines to finish.
wg.Wait() wg.Wait()
if !isQuorum(dErrs, xl.writeQuorum) { // Do we have write quorum?
// Return errXLWriteQuorum if errors were more than if !isDiskQuorum(dErrs, xl.writeQuorum) {
// allowed write quorum. // Return errXLWriteQuorum if errors were more than allowed write quorum.
return errXLWriteQuorum return errXLWriteQuorum
} }

@ -76,16 +76,18 @@ func reduceErrs(errs []error) error {
return errTypes[max].err return errTypes[max].err
} }
// Validates if we have quorum based on the errors with errDiskNotFound. // Validates if we have quorum based on the errors related to disk only.
func isQuorum(errs []error, minQuorumCount int) bool { // Returns 'true' if we have quorum, 'false' if we don't.
var errCount int func isDiskQuorum(errs []error, minQuorumCount int) bool {
var count int
for _, err := range errs { for _, err := range errs {
if err == errDiskNotFound || err == errFaultyDisk || err == errDiskAccessDenied { switch err {
case errDiskNotFound, errFaultyDisk, errDiskAccessDenied:
continue continue
} }
errCount++ count++
} }
return errCount >= minQuorumCount return count >= minQuorumCount
} }
// Similar to 'len(slice)' but returns the actual elements count // Similar to 'len(slice)' but returns the actual elements count

Loading…
Cancel
Save