xl: Fix rare freeze after many disk/network errors (#4438)

xl.storageDisks is sometimes passed to some low-level XL functions. Some disks in
xl.storageDisks are set to nil when they encounter some errors. This means all
elements in xl.storageDisks will be nil after some time which lead to an unusable XL.
master
Anis Elleuch 8 years ago committed by Harshavardhana
parent dce76d9307
commit af8071c86a
  1. 24
      cmd/erasure-createfile.go
  2. 8
      cmd/erasure-createfile_test.go
  3. 2
      cmd/erasure-healfile_test.go
  4. 6
      cmd/erasure-readfile_test.go
  5. 2
      cmd/xl-v1-healing.go
  6. 14
      cmd/xl-v1-metadata.go
  7. 28
      cmd/xl-v1-multipart.go
  8. 26
      cmd/xl-v1-object.go
  9. 18
      cmd/xl-v1-utils.go
  10. 59
      cmd/xl-v1-utils_test.go

@ -29,7 +29,7 @@ import (
// all the disks, writes also calculate individual block's checksum // all the disks, writes also calculate individual block's checksum
// for future bit-rot protection. // for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, allowEmpty bool, blockSize int64, func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, allowEmpty bool, blockSize int64,
dataBlocks, parityBlocks int, algo HashAlgo, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { dataBlocks, parityBlocks int, algo HashAlgo, writeQuorum int) (newDisks []StorageAPI, bytesWritten int64, checkSums []string, err error) {
// Allocated blockSized buffer for reading from incoming stream. // Allocated blockSized buffer for reading from incoming stream.
buf := make([]byte, blockSize) buf := make([]byte, blockSize)
@ -43,7 +43,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
// FIXME: this is a bug in Golang, n == 0 and err == // FIXME: this is a bug in Golang, n == 0 and err ==
// io.ErrUnexpectedEOF for io.ReadFull function. // io.ErrUnexpectedEOF for io.ReadFull function.
if n == 0 && rErr == io.ErrUnexpectedEOF { if n == 0 && rErr == io.ErrUnexpectedEOF {
return 0, nil, traceError(rErr) return nil, 0, nil, traceError(rErr)
} }
if rErr == io.EOF { if rErr == io.EOF {
// We have reached EOF on the first byte read, io.Reader // We have reached EOF on the first byte read, io.Reader
@ -51,28 +51,28 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
// data. Will create a 0byte file instead. // data. Will create a 0byte file instead.
if bytesWritten == 0 && allowEmpty { if bytesWritten == 0 && allowEmpty {
blocks = make([][]byte, len(disks)) blocks = make([][]byte, len(disks))
rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum) newDisks, rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum)
if rErr != nil { if rErr != nil {
return 0, nil, rErr return nil, 0, nil, rErr
} }
} // else we have reached EOF after few reads, no need to } // else we have reached EOF after few reads, no need to
// add an additional 0bytes at the end. // add an additional 0bytes at the end.
break break
} }
if rErr != nil && rErr != io.ErrUnexpectedEOF { if rErr != nil && rErr != io.ErrUnexpectedEOF {
return 0, nil, traceError(rErr) return nil, 0, nil, traceError(rErr)
} }
if n > 0 { if n > 0 {
// Returns encoded blocks. // Returns encoded blocks.
var enErr error var enErr error
blocks, enErr = encodeData(buf[0:n], dataBlocks, parityBlocks) blocks, enErr = encodeData(buf[0:n], dataBlocks, parityBlocks)
if enErr != nil { if enErr != nil {
return 0, nil, enErr return nil, 0, nil, enErr
} }
// Write to all disks. // Write to all disks.
if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil { if newDisks, err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil {
return 0, nil, err return nil, 0, nil, err
} }
bytesWritten += int64(n) bytesWritten += int64(n)
} }
@ -82,7 +82,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
for i := range checkSums { for i := range checkSums {
checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil)) checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil))
} }
return bytesWritten, checkSums, nil return newDisks, bytesWritten, checkSums, nil
} }
// encodeData - encodes incoming data buffer into // encodeData - encodes incoming data buffer into
@ -110,7 +110,7 @@ func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, erro
} }
// appendFile - append data buffer at path. // appendFile - append data buffer at path.
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) (err error) { func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
var wErrs = make([]error, len(disks)) var wErrs = make([]error, len(disks))
// Write encoded data to quorum disks in parallel. // Write encoded data to quorum disks in parallel.
@ -126,8 +126,6 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
wErr := disk.AppendFile(volume, path, enBlocks[index]) wErr := disk.AppendFile(volume, path, enBlocks[index])
if wErr != nil { if wErr != nil {
wErrs[index] = traceError(wErr) wErrs[index] = traceError(wErr)
// Ignore disk which returned an error.
disks[index] = nil
return return
} }
@ -142,5 +140,5 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
// Wait for all the appends to finish. // Wait for all the appends to finish.
wg.Wait() wg.Wait()
return reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum) return evalDisks(disks, wErrs), reduceWriteQuorumErrs(wErrs, objectOpIgnoredErrs, writeQuorum)
} }

@ -56,7 +56,7 @@ func TestErasureCreateFile(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Test when all disks are up. // Test when all disks are up.
size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -69,7 +69,7 @@ func TestErasureCreateFile(t *testing.T) {
disks[5] = AppendDiskDown{disks[5].(*posix)} disks[5] = AppendDiskDown{disks[5].(*posix)}
// Test when two disks are down. // Test when two disks are down.
size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -83,7 +83,7 @@ func TestErasureCreateFile(t *testing.T) {
disks[8] = AppendDiskDown{disks[8].(*posix)} disks[8] = AppendDiskDown{disks[8].(*posix)}
disks[9] = AppendDiskDown{disks[9].(*posix)} disks[9] = AppendDiskDown{disks[9].(*posix)}
size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -93,7 +93,7 @@ func TestErasureCreateFile(t *testing.T) {
// 1 more disk down. 7 disk down in total. Should return quorum error. // 1 more disk down. 7 disk down in total. Should return quorum error.
disks[10] = AppendDiskDown{disks[10].(*posix)} disks[10] = AppendDiskDown{disks[10].(*posix)}
_, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, _, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if errorCause(err) != errXLWriteQuorum { if errorCause(err) != errXLWriteQuorum {
t.Errorf("erasureCreateFile return value: expected errXLWriteQuorum, got %s", err) t.Errorf("erasureCreateFile return value: expected errXLWriteQuorum, got %s", err)
} }

@ -48,7 +48,7 @@ func TestErasureHealFile(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Create a test file. // Create a test file.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -242,7 +242,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
} }
// Create a test file to read from. // Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -325,7 +325,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
} }
// Create a test file to read from. // Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -404,7 +404,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
iterations := 10000 iterations := 10000
// Create a test file to read from. // Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) _, size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), true, blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -468,7 +468,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
} }
// Generate and write `xl.json` generated from other disks. // Generate and write `xl.json` generated from other disks.
aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks)) outDatedDisks, aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks))
if aErr != nil { if aErr != nil {
return 0, 0, toObjectErr(aErr, bucket, object) return 0, 0, toObjectErr(aErr, bucket, object)
} }

@ -389,7 +389,7 @@ func deleteAllXLMetadata(disks []StorageAPI, bucket, prefix string, errs []error
} }
// Rename `xl.json` content to destination location for each disk in order. // Rename `xl.json` content to destination location for each disk in order.
func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) error { func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
isDir := false isDir := false
srcXLJSON := path.Join(srcEntry, xlMetaJSONFile) srcXLJSON := path.Join(srcEntry, xlMetaJSONFile)
dstXLJSON := path.Join(dstEntry, xlMetaJSONFile) dstXLJSON := path.Join(dstEntry, xlMetaJSONFile)
@ -397,7 +397,7 @@ func renameXLMetadata(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEnt
} }
// writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order. // writeUniqueXLMetadata - writes unique `xl.json` content for each disk in order.
func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) error { func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []xlMetaV1, quorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(disks)) var mErrs = make([]error, len(disks))
@ -419,8 +419,6 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
err := writeXLMetadata(disk, bucket, prefix, xlMetas[index]) err := writeXLMetadata(disk, bucket, prefix, xlMetas[index])
if err != nil { if err != nil {
mErrs[index] = err mErrs[index] = err
// Ignore disk which returned an error.
disks[index] = nil
} }
}(index, disk) }(index, disk)
} }
@ -433,11 +431,11 @@ func writeUniqueXLMetadata(disks []StorageAPI, bucket, prefix string, xlMetas []
// Delete all `xl.json` successfully renamed. // Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, bucket, prefix, mErrs) deleteAllXLMetadata(disks, bucket, prefix, mErrs)
} }
return err return evalDisks(disks, mErrs), err
} }
// writeSameXLMetadata - write `xl.json` on all disks in order. // writeSameXLMetadata - write `xl.json` on all disks in order.
func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) error { func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMetaV1, writeQuorum, readQuorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(disks)) var mErrs = make([]error, len(disks))
@ -459,8 +457,6 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
err := writeXLMetadata(disk, bucket, prefix, metadata) err := writeXLMetadata(disk, bucket, prefix, metadata)
if err != nil { if err != nil {
mErrs[index] = err mErrs[index] = err
// Ignore disk which returned an error.
disks[index] = nil
} }
}(index, disk, xlMeta) }(index, disk, xlMeta)
} }
@ -473,5 +469,5 @@ func writeSameXLMetadata(disks []StorageAPI, bucket, prefix string, xlMeta xlMet
// Delete all `xl.json` successfully renamed. // Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, bucket, prefix, mErrs) deleteAllXLMetadata(disks, bucket, prefix, mErrs)
} }
return err return evalDisks(disks, mErrs), err
} }

@ -233,7 +233,7 @@ func (xl xlObjects) statPart(bucket, object, uploadID, partName string) (fileInf
} }
// commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks. // commitXLMetadata - commit `xl.json` from source prefix to destination prefix in the given slice of disks.
func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) error { func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPrefix string, quorum int) ([]StorageAPI, error) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
var mErrs = make([]error, len(disks)) var mErrs = make([]error, len(disks))
@ -257,8 +257,6 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile) rErr := disk.RenameFile(srcBucket, srcJSONFile, dstBucket, dstJSONFile)
if rErr != nil { if rErr != nil {
mErrs[index] = traceError(rErr) mErrs[index] = traceError(rErr)
// Ignore disk which returned an error.
disks[index] = nil
return return
} }
mErrs[index] = nil mErrs[index] = nil
@ -272,7 +270,7 @@ func commitXLMetadata(disks []StorageAPI, srcBucket, srcPrefix, dstBucket, dstPr
// Delete all `xl.json` successfully renamed. // Delete all `xl.json` successfully renamed.
deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs) deleteAllXLMetadata(disks, dstBucket, dstPrefix, mErrs)
} }
return err return evalDisks(disks, mErrs), err
} }
// listMultipartUploads - lists all multipart uploads. // listMultipartUploads - lists all multipart uploads.
@ -491,7 +489,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
uploadIDPath := path.Join(bucket, object, uploadID) uploadIDPath := path.Join(bucket, object, uploadID)
tempUploadIDPath := uploadID tempUploadIDPath := uploadID
// Write updated `xl.json` to all disks. // Write updated `xl.json` to all disks.
err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum) disks, err := writeSameXLMetadata(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, xlMeta, xl.writeQuorum, xl.readQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) return "", toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
} }
@ -501,7 +499,7 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st
defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath) defer xl.deleteObject(minioMetaTmpBucket, tempUploadIDPath)
// Attempt to rename temp upload object to actual upload path object // Attempt to rename temp upload object to actual upload path object
rErr := renameObject(xl.storageDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) _, rErr := renameObject(disks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
if rErr != nil { if rErr != nil {
return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) return "", toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
} }
@ -657,7 +655,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
allowEmpty := true allowEmpty := true
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, allowEmpty, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum) onlineDisks, sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tmpPartPath, teeReader, allowEmpty, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {
return PartInfo{}, toObjectErr(err, bucket, object) return PartInfo{}, toObjectErr(err, bucket, object)
} }
@ -702,7 +700,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Rename temporary part file to its final location. // Rename temporary part file to its final location.
partPath := path.Join(uploadIDPath, partSuffix) partPath := path.Join(uploadIDPath, partSuffix)
err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum) onlineDisks, err = renamePart(onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, xl.writeQuorum)
if err != nil { if err != nil {
return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partPath) return PartInfo{}, toObjectErr(err, minioMetaMultipartBucket, partPath)
} }
@ -746,10 +744,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
tempXLMetaPath := newUUID tempXLMetaPath := newUUID
// Writes a unique `xl.json` each disk carrying new checksum related information. // Writes a unique `xl.json` each disk carrying new checksum related information.
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil { if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, partsMetadata, xl.writeQuorum); err != nil {
return PartInfo{}, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath) return PartInfo{}, toObjectErr(err, minioMetaTmpBucket, tempXLMetaPath)
} }
rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) var rErr error
onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempXLMetaPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
if rErr != nil { if rErr != nil {
return PartInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) return PartInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
} }
@ -986,11 +985,12 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Write unique `xl.json` for each disk. // Write unique `xl.json` for each disk.
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil { if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, partsMetadata, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath) return ObjectInfo{}, toObjectErr(err, minioMetaTmpBucket, tempUploadIDPath)
} }
rErr := commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum) var rErr error
onlineDisks, rErr = commitXLMetadata(onlineDisks, minioMetaTmpBucket, tempUploadIDPath, minioMetaMultipartBucket, uploadIDPath, xl.writeQuorum)
if rErr != nil { if rErr != nil {
return ObjectInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath) return ObjectInfo{}, toObjectErr(rErr, minioMetaMultipartBucket, uploadIDPath)
} }
@ -1018,7 +1018,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
// NOTE: Do not use online disks slice here. // NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged // The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. // regardless of `xl.json` status and rolled back in case of errors.
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum)
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
@ -1038,7 +1038,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Rename the multipart object to final location. // Rename the multipart object to final location.
if err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil { if onlineDisks, err = renameObject(onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }

@ -94,11 +94,11 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
tempObj := mustGetUUID() tempObj := mustGetUUID()
// Write unique `xl.json` for each disk. // Write unique `xl.json` for each disk.
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
} }
// Rename atomically `xl.json` from tmp location to destination for each disk. // Rename atomically `xl.json` from tmp location to destination for each disk.
if err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, xl.writeQuorum); err != nil { if onlineDisks, err = renameXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
} }
return xlMeta.ToObjectInfo(srcBucket, srcObject), nil return xlMeta.ToObjectInfo(srcBucket, srcObject), nil
@ -374,7 +374,7 @@ func undoRename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry str
// rename - common function that renamePart and renameObject use to rename // rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations. // the respective underlying storage layer representations.
func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, quorum int) error { func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, isDir bool, quorum int) ([]StorageAPI, error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
@ -398,8 +398,6 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry) err := disk.RenameFile(srcBucket, srcEntry, dstBucket, dstEntry)
if err != nil && err != errFileNotFound { if err != nil && err != errFileNotFound {
errs[index] = traceError(err) errs[index] = traceError(err)
// Ignore disk which returned an error.
disks[index] = nil
} }
}(index, disk) }(index, disk)
} }
@ -414,14 +412,14 @@ func rename(disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string,
// Undo all the partial rename operations. // Undo all the partial rename operations.
undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs) undoRename(disks, srcBucket, srcEntry, dstBucket, dstEntry, isDir, errs)
} }
return err return evalDisks(disks, errs), err
} }
// renamePart - renames a part of the source object to the destination // renamePart - renames a part of the source object to the destination
// across all disks in parallel. Additionally if we have errors and do // across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to // not have a readQuorum partially renamed files are renamed back to
// its proper location. // its proper location.
func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) error { func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart string, quorum int) ([]StorageAPI, error) {
isDir := false isDir := false
return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum) return rename(disks, srcBucket, srcPart, dstBucket, dstPart, isDir, quorum)
} }
@ -430,7 +428,7 @@ func renamePart(disks []StorageAPI, srcBucket, srcPart, dstBucket, dstPart strin
// across all disks in parallel. Additionally if we have errors and do // across all disks in parallel. Additionally if we have errors and do
// not have a readQuorum partially renamed files are renamed back to // not have a readQuorum partially renamed files are renamed back to
// its proper location. // its proper location.
func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) error { func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject string, quorum int) ([]StorageAPI, error) {
isDir := true isDir := true
return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum) return rename(disks, srcBucket, srcObject, dstBucket, dstObject, isDir, quorum)
} }
@ -573,8 +571,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// when size == -1 because in this case, we are not able to predict how many parts we will have. // when size == -1 because in this case, we are not able to predict how many parts we will have.
allowEmptyPart := partIdx == 1 allowEmptyPart := partIdx == 1
var partSizeWritten int64
var checkSums []string
var erasureErr error
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
partSizeWritten, checkSums, erasureErr := erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, partReader, allowEmptyPart, partsMetadata[0].Erasure.BlockSize, partsMetadata[0].Erasure.DataBlocks, partsMetadata[0].Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum) onlineDisks, partSizeWritten, checkSums, erasureErr = erasureCreateFile(onlineDisks, minioMetaTmpBucket, tempErasureObj, partReader, allowEmptyPart, partsMetadata[0].Erasure.BlockSize, partsMetadata[0].Erasure.DataBlocks, partsMetadata[0].Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum)
if erasureErr != nil { if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
} }
@ -674,7 +676,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// NOTE: Do not use online disks slice here. // NOTE: Do not use online disks slice here.
// The reason is that existing object should be purged // The reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. // regardless of `xl.json` status and rolled back in case of errors.
err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum) _, err = renameObject(xl.storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, xl.writeQuorum)
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
@ -689,12 +691,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
} }
// Write unique `xl.json` for each disk. // Write unique `xl.json` for each disk.
if err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil { if onlineDisks, err = writeUniqueXLMetadata(onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }
// Rename the successfully written temporary object to final location. // Rename the successfully written temporary object to final location.
err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum) onlineDisks, err = renameObject(onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum)
if err != nil { if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object) return ObjectInfo{}, toObjectErr(err, bucket, object)
} }

@ -357,6 +357,24 @@ func shuffleDisks(disks []StorageAPI, distribution []int) (shuffledDisks []Stora
return shuffledDisks return shuffledDisks
} }
// evalDisks - returns a new slice of disks where nil is set if
// the correspond error in errs slice is not nil
func evalDisks(disks []StorageAPI, errs []error) []StorageAPI {
if len(errs) != len(disks) {
errorIf(errors.New("unexpected disks/errors slice length"), "unable to evaluate internal disks")
return nil
}
newDisks := make([]StorageAPI, len(disks))
for index := range errs {
if errs[index] == nil {
newDisks[index] = disks[index]
} else {
newDisks[index] = nil
}
}
return newDisks
}
// Errors specifically generated by getPartSizeFromIdx function. // Errors specifically generated by getPartSizeFromIdx function.
var ( var (
errPartSizeZero = errors.New("Part size cannot be zero") errPartSizeZero = errors.New("Part size cannot be zero")

@ -18,6 +18,7 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"errors"
"reflect" "reflect"
"strconv" "strconv"
"testing" "testing"
@ -429,3 +430,61 @@ func testShuffleDisks(t *testing.T, xl *xlObjects) {
t.Errorf("shuffleDisks returned incorrect order.") t.Errorf("shuffleDisks returned incorrect order.")
} }
} }
// TestEvalDisks tests the behavior of evalDisks
func TestEvalDisks(t *testing.T) {
nDisks := 16
disks, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
objLayer, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
if err != nil {
removeRoots(disks)
t.Fatal(err)
}
defer removeRoots(disks)
xl := objLayer.(*xlObjects)
testShuffleDisks(t, xl)
}
func testEvalDisks(t *testing.T, xl *xlObjects) {
disks := xl.storageDisks
diskErr := errors.New("some disk error")
errs := []error{
diskErr, nil, nil, nil,
nil, diskErr, nil, nil,
diskErr, nil, nil, nil,
nil, nil, nil, diskErr,
}
// Test normal setup with some disks
// returning errors
newDisks := evalDisks(disks, errs)
if newDisks[0] != nil ||
newDisks[1] != disks[1] ||
newDisks[2] != disks[2] ||
newDisks[3] != disks[3] ||
newDisks[4] != disks[4] ||
newDisks[5] != nil ||
newDisks[6] != disks[6] ||
newDisks[7] != disks[7] ||
newDisks[8] != nil ||
newDisks[9] != disks[9] ||
newDisks[10] != disks[10] ||
newDisks[11] != disks[11] ||
newDisks[12] != disks[12] ||
newDisks[13] != disks[13] ||
newDisks[14] != disks[14] ||
newDisks[15] != nil {
t.Errorf("evalDisks returned incorrect new disk set.")
}
// Test when number of errs doesn't match with number of disks
errs = []error{nil, nil, nil, nil}
newDisks = evalDisks(disks, errs)
if newDisks != nil {
t.Errorf("evalDisks returned no nil slice")
}
}

Loading…
Cancel
Save