XL/Erasure: Make bit-rot verification based on xl.json algo. (#2299)

Currently `xl.json` saves algorithm information for bit-rot
verification. Since the bit-rot algo's can change in the
future make sure the erasureReadFile doesn't default to
a particular algo. Instead use the checkSumInfo.
master
Harshavardhana 8 years ago committed by GitHub
parent 65f71ce0c5
commit f503ac3db8
  1. 4
      erasure-createfile.go
  2. 13
      erasure-createfile_test.go
  3. 24
      erasure-readfile.go
  4. 18
      erasure-readfile_test.go
  5. 4
      erasure-utils.go
  6. 3
      erasure-utils_test.go
  7. 2
      object-api-putobject_test.go
  8. 48
      xl-v1-metadata.go
  9. 8
      xl-v1-multipart.go
  10. 23
      xl-v1-object.go
  11. 2
      xl-v1.go

@ -28,11 +28,11 @@ import (
// erasureCreateFile - writes an entire stream by erasure coding to // erasureCreateFile - writes an entire stream by erasure coding to
// all the disks, writes also calculate individual block's checksum // all the disks, writes also calculate individual block's checksum
// for future bit-rot protection. // for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) {
// Allocated blockSized buffer for reading. // Allocated blockSized buffer for reading.
buf := make([]byte, blockSize) buf := make([]byte, blockSize)
hashWriters := newHashWriters(len(disks)) hashWriters := newHashWriters(len(disks), algo)
// Read until io.EOF, erasure codes data and writes to all disks. // Read until io.EOF, erasure codes data and writes to all disks.
for { for {

@ -19,8 +19,9 @@ package main
import ( import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"github.com/klauspost/reedsolomon"
"testing" "testing"
"github.com/klauspost/reedsolomon"
) )
// Simulates a faulty disk for AppendFile() // Simulates a faulty disk for AppendFile()
@ -54,7 +55,7 @@ func TestErasureCreateFile(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Test when all disks are up. // Test when all disks are up.
size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -67,7 +68,7 @@ func TestErasureCreateFile(t *testing.T) {
disks[5] = AppendDiskDown{disks[5].(*posix)} disks[5] = AppendDiskDown{disks[5].(*posix)}
// Test when two disks are down. // Test when two disks are down.
size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -81,7 +82,7 @@ func TestErasureCreateFile(t *testing.T) {
disks[8] = AppendDiskDown{disks[8].(*posix)} disks[8] = AppendDiskDown{disks[8].(*posix)}
disks[9] = AppendDiskDown{disks[9].(*posix)} disks[9] = AppendDiskDown{disks[9].(*posix)}
size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -91,9 +92,9 @@ func TestErasureCreateFile(t *testing.T) {
// 1 more disk down. 7 disk down in total. Should return quorum error. // 1 more disk down. 7 disk down in total. Should return quorum error.
disks[10] = AppendDiskDown{disks[10].(*posix)} disks[10] = AppendDiskDown{disks[10].(*posix)}
size, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != errXLWriteQuorum { if err != errXLWriteQuorum {
t.Error("Expected errXLWriteQuorum error") t.Errorf("erasureCreateFile returned expected errXLWriteQuorum error, got %s", err)
} }
} }

@ -159,7 +159,7 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St
// are decoded into a data block. Data block is trimmed for given offset and length, // are decoded into a data block. Data block is trimmed for given offset and length,
// then written to given writer. This function also supports bit-rot detection by // then written to given writer. This function also supports bit-rot detection by
// verifying checksum of individual block's checksum. // verifying checksum of individual block's checksum.
func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string, pool *bpool.BytePool) (int64, error) { func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string, algo string, pool *bpool.BytePool) (int64, error) {
// Offset and length cannot be negative. // Offset and length cannot be negative.
if offset < 0 || length < 0 { if offset < 0 || length < 0 {
return 0, errUnexpected return 0, errUnexpected
@ -186,7 +186,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
return true return true
} }
// Is this a valid block? // Is this a valid block?
isValid := isValidBlock(disks[diskIndex], volume, path, checkSums[diskIndex]) isValid := isValidBlock(disks[diskIndex], volume, path, checkSums[diskIndex], algo)
verified[diskIndex] = isValid verified[diskIndex] = isValid
return isValid return isValid
} }
@ -300,31 +300,25 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
return bytesWritten, nil return bytesWritten, nil
} }
// PartObjectChecksum - returns the checksum for the part name from the checksum slice.
func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo {
for _, checksum := range e.Checksum {
if checksum.Name == partName {
return checksum
}
}
return checkSumInfo{}
}
// isValidBlock - calculates the checksum hash for the block and // isValidBlock - calculates the checksum hash for the block and
// validates if its correct returns true for valid cases, false otherwise. // validates if its correct returns true for valid cases, false otherwise.
func isValidBlock(disk StorageAPI, volume, path string, checksum string) (ok bool) { func isValidBlock(disk StorageAPI, volume, path, checkSum, checkSumAlgo string) (ok bool) {
// Disk is not available, not a valid block. // Disk is not available, not a valid block.
if disk == nil { if disk == nil {
return false return false
} }
// Checksum not available, not a valid block.
if checkSum == "" {
return false
}
// Read everything for a given block and calculate hash. // Read everything for a given block and calculate hash.
hashWriter := newHash("blake2b") hashWriter := newHash(checkSumAlgo)
hashBytes, err := hashSum(disk, volume, path, hashWriter) hashBytes, err := hashSum(disk, volume, path, hashWriter)
if err != nil { if err != nil {
errorIf(err, "Unable to calculate checksum %s/%s", volume, path) errorIf(err, "Unable to calculate checksum %s/%s", volume, path)
return false return false
} }
return hex.EncodeToString(hashBytes) == checksum return hex.EncodeToString(hashBytes) == checkSum
} }
// decodeData - decode encoded blocks. // decodeData - decode encoded blocks.

@ -243,7 +243,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
} }
// Create a test file to read from. // Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -257,7 +257,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
pool := bpool.NewBytePool(chunkSize, len(disks)) pool := bpool.NewBytePool(chunkSize, len(disks))
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -270,7 +270,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
disks[5] = ReadDiskDown{disks[5].(*posix)} disks[5] = ReadDiskDown{disks[5].(*posix)}
buf.Reset() buf.Reset()
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -285,7 +285,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
disks[11] = ReadDiskDown{disks[11].(*posix)} disks[11] = ReadDiskDown{disks[11].(*posix)}
buf.Reset() buf.Reset()
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -296,7 +296,7 @@ func TestErasureReadFileDiskFail(t *testing.T) {
// 1 more disk down. 7 disks down in total. Read should fail. // 1 more disk down. 7 disks down in total. Read should fail.
disks[12] = ReadDiskDown{disks[12].(*posix)} disks[12] = ReadDiskDown{disks[12].(*posix)}
buf.Reset() buf.Reset()
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != errXLReadQuorum { if err != errXLReadQuorum {
t.Fatal("expected errXLReadQuorum error") t.Fatal("expected errXLReadQuorum error")
} }
@ -325,7 +325,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
} }
// Create a test file to read from. // Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -361,7 +361,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) {
for i, testCase := range testCases { for i, testCase := range testCases {
expected := data[testCase.offset:(testCase.offset + testCase.length)] expected := data[testCase.offset:(testCase.offset + testCase.length)]
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue
@ -404,7 +404,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
iterations := 10000 iterations := 10000
// Create a test file to read from. // Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -429,7 +429,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) {
expected := data[offset : offset+readLen] expected := data[offset : offset+readLen]
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums, bitRotAlgo, pool)
if err != nil { if err != nil {
t.Fatal(err, offset, readLen) t.Fatal(err, offset, readLen)
} }

@ -27,10 +27,10 @@ import (
) )
// newHashWriters - inititialize a slice of hashes for the disk count. // newHashWriters - inititialize a slice of hashes for the disk count.
func newHashWriters(diskCount int) []hash.Hash { func newHashWriters(diskCount int, algo string) []hash.Hash {
hashWriters := make([]hash.Hash, diskCount) hashWriters := make([]hash.Hash, diskCount)
for index := range hashWriters { for index := range hashWriters {
hashWriters[index] = newHash("blake2b") hashWriters[index] = newHash(algo)
} }
return hashWriters return hashWriters
} }

@ -23,8 +23,7 @@ import (
// Test validates the number hash writers returned. // Test validates the number hash writers returned.
func TestNewHashWriters(t *testing.T) { func TestNewHashWriters(t *testing.T) {
diskNum := 8 diskNum := 8
hashWriters := newHashWriters(diskNum) hashWriters := newHashWriters(diskNum, bitRotAlgo)
if len(hashWriters) != diskNum { if len(hashWriters) != diskNum {
t.Errorf("Expected %d hashWriters, but instead got %d", diskNum, len(hashWriters)) t.Errorf("Expected %d hashWriters, but instead got %d", diskNum, len(hashWriters))
} }

@ -199,7 +199,7 @@ func testObjectAPIPutObjectDiskNotFOund(obj ObjectLayer, instanceType string, di
} }
// Take 8 disks down, one more we loose quorum on 16 disk node. // Take 8 disks down, one more we loose quorum on 16 disk node.
for _, disk := range disks[:8] { for _, disk := range disks[:7] {
removeAll(disk) removeAll(disk)
} }

@ -52,6 +52,11 @@ type checkSumInfo struct {
Hash string `json:"hash"` Hash string `json:"hash"`
} }
// Constant indicates current bit-rot algo used when creating objects.
const (
bitRotAlgo = "blake2b"
)
// erasureInfo - carries erasure coding related information, block // erasureInfo - carries erasure coding related information, block
// distribution and checksums. // distribution and checksums.
type erasureInfo struct { type erasureInfo struct {
@ -64,6 +69,28 @@ type erasureInfo struct {
Checksum []checkSumInfo `json:"checksum,omitempty"` Checksum []checkSumInfo `json:"checksum,omitempty"`
} }
// AddCheckSum - add checksum of a part.
func (e *erasureInfo) AddCheckSumInfo(ckSumInfo checkSumInfo) {
for i, sum := range e.Checksum {
if sum.Name == ckSumInfo.Name {
e.Checksum[i] = ckSumInfo
return
}
}
e.Checksum = append(e.Checksum, ckSumInfo)
}
// GetCheckSumInfo - get checksum of a part.
func (e erasureInfo) GetCheckSumInfo(partName string) (ckSum checkSumInfo, err error) {
// Return the checksum.
for _, sum := range e.Checksum {
if sum.Name == partName {
return sum, nil
}
}
return checkSumInfo{}, errUnexpected
}
// statInfo - carries stat information of the object. // statInfo - carries stat information of the object.
type statInfo struct { type statInfo struct {
Size int64 `json:"size"` // Size of the object `xl.json`. Size int64 `json:"size"` // Size of the object `xl.json`.
@ -144,27 +171,6 @@ func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag strin
sort.Sort(byObjectPartNumber(m.Parts)) sort.Sort(byObjectPartNumber(m.Parts))
} }
// AddCheckSum - add checksum of a part.
func (m *xlMetaV1) AddCheckSum(partName, algorithm, checkSum string) {
for i, sum := range m.Erasure.Checksum {
if sum.Name == partName {
m.Erasure.Checksum[i] = checkSumInfo{partName, "blake2b", checkSum}
return
}
}
m.Erasure.Checksum = append(m.Erasure.Checksum, checkSumInfo{partName, "blake2b", checkSum})
}
// GetCheckSum - get checksum of a part.
func (m xlMetaV1) GetCheckSum(partName string) (checkSum, algorithm string, err error) {
for _, sum := range m.Erasure.Checksum {
if sum.Name == partName {
return sum.Hash, sum.Algorithm, nil
}
}
return "", "", errUnexpected
}
// ObjectToPartOffset - translate offset of an object to offset of its individual part. // ObjectToPartOffset - translate offset of an object to offset of its individual part.
func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) { func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) {
if offset == 0 { if offset == 0 {

@ -384,7 +384,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
teeReader := io.TeeReader(data, md5Writer) teeReader := io.TeeReader(data, md5Writer)
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
@ -459,7 +459,11 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
continue continue
} }
partsMetadata[index].Parts = xlMeta.Parts partsMetadata[index].Parts = xlMeta.Parts
partsMetadata[index].AddCheckSum(partSuffix, "blake2b", checkSums[index]) partsMetadata[index].Erasure.AddCheckSumInfo(checkSumInfo{
Name: partSuffix,
Hash: checkSums[index],
Algorithm: bitRotAlgo,
})
} }
// Write all the checksum metadata. // Write all the checksum metadata.

@ -175,24 +175,33 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Get the checksums of the current part. // Get the checksums of the current part.
checkSums := make([]string, len(onlineDisks)) checkSums := make([]string, len(onlineDisks))
var ckSumAlgo string
for index, disk := range onlineDisks { for index, disk := range onlineDisks {
// Disk is not found skip the checksum. // Disk is not found skip the checksum.
if disk == nil { if disk == nil {
checkSums[index] = "" checkSums[index] = ""
continue continue
} }
checkSums[index], _, err = metaArr[index].GetCheckSum(partName) ckSumInfo, err := metaArr[index].Erasure.GetCheckSumInfo(partName)
if err != nil { // FIXME - relook at returning error here. if err != nil { // FIXME - relook at returning error here.
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
checkSums[index] = ckSumInfo.Hash
// Set checksum algo only once, while it is possible to have
// different algos per block because of our `xl.json`.
// It is not a requirement, set this only once for all the disks.
if ckSumAlgo != "" {
ckSumAlgo = ckSumInfo.Algorithm
}
} }
// Start reading the part name. // Start erasure decoding and writing to the client.
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums, pool) n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums, ckSumAlgo, pool)
if err != nil { if err != nil {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
// Track total bytes read from disk and written to the client.
totalBytesRead += n totalBytesRead += n
// partOffset will be valid only for the first part, hence reset it to 0 for // partOffset will be valid only for the first part, hence reset it to 0 for
@ -413,7 +422,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks) onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks)
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, bitRotAlgo, xl.writeQuorum)
if err != nil { if err != nil {
// Create file failed, delete temporary object. // Create file failed, delete temporary object.
xl.deleteObject(minioMetaTmpBucket, tempObj) xl.deleteObject(minioMetaTmpBucket, tempObj)
@ -508,7 +517,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Update `xl.json` content on each disks. // Update `xl.json` content on each disks.
for index := range partsMetadata { for index := range partsMetadata {
partsMetadata[index] = xlMeta partsMetadata[index] = xlMeta
partsMetadata[index].AddCheckSum("part.1", "blake2b", checkSums[index]) partsMetadata[index].Erasure.AddCheckSumInfo(checkSumInfo{
Name: "part.1",
Hash: checkSums[index],
Algorithm: bitRotAlgo,
})
} }
// Write unique `xl.json` for each disk. // Write unique `xl.json` for each disk.

@ -196,7 +196,7 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) {
// Figure out read and write quorum based on number of storage disks. // Figure out read and write quorum based on number of storage disks.
// READ and WRITE quorum is always set to (N/2) number of disks. // READ and WRITE quorum is always set to (N/2) number of disks.
xl.readQuorum = len(xl.storageDisks) / 2 xl.readQuorum = len(xl.storageDisks) / 2
xl.writeQuorum = len(xl.storageDisks) / 2 xl.writeQuorum = len(xl.storageDisks)/2 + 1
// Return successfully initialized object layer. // Return successfully initialized object layer.
return xl, nil return xl, nil

Loading…
Cancel
Save