XL: Ignore and continue for cases when bucket does not exist. (#2205)

Fixes #2201
Fixes #2204
master
Harshavardhana 8 years ago committed by GitHub
parent 45240f158d
commit cdf1373f8e
  1. 1
      xl-v1-bucket.go
  2. 41
      xl-v1-metadata.go
  3. 40
      xl-v1-multipart-common.go
  4. 4
      xl-v1-multipart.go
  5. 16
      xl-v1-object.go

@ -108,6 +108,7 @@ var bucketMetadataOpIgnoredErrs = []error{
errDiskNotFound, errDiskNotFound,
errDiskAccessDenied, errDiskAccessDenied,
errFaultyDisk, errFaultyDisk,
errVolumeNotFound,
} }
// getBucketInfo - returns the BucketInfo from one of the load balanced disks. // getBucketInfo - returns the BucketInfo from one of the load balanced disks.

@ -333,11 +333,23 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
return errXLWriteQuorum return errXLWriteQuorum
} }
// For all other errors return. // Reduce errors and verify quourm and return.
for _, err := range mErrs { if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil {
if err != nil && err != errDiskNotFound { if errCount < writeQuorum {
return err // Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
return errXLWriteQuorum
}
if isErrIgnored(reducedErr, []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errVolumeNotFound,
}) {
// Success.
return nil
} }
return reducedErr
} }
// Success. // Success.
@ -386,11 +398,24 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
return errXLWriteQuorum return errXLWriteQuorum
} }
// For any other errors delete `xl.json` as well. // Reduce errors and verify quourm and return.
for _, err := range mErrs { if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil {
if err != nil && err != errDiskNotFound { if errCount < writeQuorum {
return err // Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, bucket, prefix, mErrs)
return errXLWriteQuorum
}
// Ignore specific errors if we are under write quorum.
if isErrIgnored(reducedErr, []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errVolumeNotFound,
}) {
// Success.
return nil
} }
return reducedErr
} }
// Success. // Success.

@ -63,6 +63,10 @@ func (xl xlObjects) updateUploadsJSON(bucket, object string, uploadsJSON uploads
// Count all the errors and validate if we have write quorum. // Count all the errors and validate if we have write quorum.
if !isDiskQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
// Do we have readQuorum?.
if isDiskQuorum(errs, xl.readQuorum) {
return nil
}
// Rename `uploads.json` left over back to tmp location. // Rename `uploads.json` left over back to tmp location.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
@ -150,6 +154,10 @@ func (xl xlObjects) writeUploadJSON(bucket, object, uploadID string, initiated t
// Count all the errors and validate if we have write quorum. // Count all the errors and validate if we have write quorum.
if !isDiskQuorum(errs, xl.writeQuorum) { if !isDiskQuorum(errs, xl.writeQuorum) {
// Do we have readQuorum?.
if isDiskQuorum(errs, xl.readQuorum) {
return nil
}
// Rename `uploads.json` left over back to tmp location. // Rename `uploads.json` left over back to tmp location.
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
if disk == nil { if disk == nil {
@ -262,7 +270,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
} }
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum int) error { func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuorum, readQuorum int) error {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(disks)) var mErrs = make([]error, len(disks))
@ -294,16 +302,36 @@ func commitXLMetadata(disks []StorageAPI, srcPrefix, dstPrefix string, writeQuor
// Wait for all the routines. // Wait for all the routines.
wg.Wait() wg.Wait()
// Do we have write quorum?. // Do we have write Quorum?.
if !isDiskQuorum(mErrs, writeQuorum) { if !isDiskQuorum(mErrs, writeQuorum) {
// Do we have readQuorum?.
if isDiskQuorum(mErrs, readQuorum) {
// Return success.
return nil
}
// Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs)
return errXLWriteQuorum return errXLWriteQuorum
} }
// For all other errors return. // Reduce errors and verify quourm and return.
for _, err := range mErrs { if errCount, reducedErr := reduceErrs(mErrs); reducedErr != nil {
if err != nil && err != errDiskNotFound { if errCount < writeQuorum {
return err // Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, minioMetaBucket, dstPrefix, mErrs)
return errXLWriteQuorum
} }
if isErrIgnored(reducedErr, []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errVolumeNotFound,
}) {
return nil
} }
return reducedErr
}
// Success.
return nil return nil
} }

@ -483,7 +483,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { if err = writeUniqueXLMetadata(onlineDisks, minioMetaBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath) return "", toObjectErr(err, minioMetaBucket, tempXLMetaPath)
} }
rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum) rErr := commitXLMetadata(onlineDisks, tempXLMetaPath, uploadIDPath, xl.writeQuorum, xl.readQuorum)
if rErr != nil { if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
} }
@ -709,7 +709,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil { if err = writeUniqueXLMetadata(xl.storageDisks, minioMetaBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum, xl.readQuorum); err != nil {
return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath)
} }
rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum) rErr := commitXLMetadata(xl.storageDisks, tempUploadIDPath, uploadIDPath, xl.writeQuorum, xl.readQuorum)
if rErr != nil { if rErr != nil {
return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath) return "", toObjectErr(rErr, minioMetaBucket, uploadIDPath)
} }

@ -315,12 +315,22 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
return errXLWriteQuorum return errXLWriteQuorum
} }
// Return on first error, also undo any partially successful rename operations. // Return on first error, also undo any partially successful rename operations.
for _, err := range errs { if errCount, reducedErr := reduceErrs(errs); reducedErr != nil {
if err != nil && err != errDiskNotFound { if errCount < writeQuorum {
// Undo all the partial rename operations. // Undo all the partial rename operations.
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum) undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isPart, errs, writeQuorum, readQuorum)
return err return errXLWriteQuorum
}
if isErrIgnored(reducedErr, []error{
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
errVolumeNotFound,
}) {
// Return success.
return nil
} }
return reducedErr
} }
return nil return nil
} }

Loading…
Cancel
Save