Fix xl.diskWithAllParts to proper checksum algorithm (#4509)

master
Daniel Lind 8 years ago committed by Harshavardhana
parent 11c4223f2c
commit dce76d9307
  1. 20
      cmd/xl-v1-healing-common.go
  2. 112
      cmd/xl-v1-healing-common_test.go

@ -252,21 +252,22 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObject
// calculating blake2b checksum.
func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, object string) ([]StorageAPI, []error, error) {
availableDisks := make([]StorageAPI, len(onlineDisks))
for index, onlineDisk := range onlineDisks {
for diskIndex, onlineDisk := range onlineDisks {
if onlineDisk == nil {
continue
}
// disk has a valid xl.json but may not have all the
// parts. This is considered an outdated disk, since
// it needs healing too.
for _, part := range partsMetadata[index].Parts {
for _, part := range partsMetadata[diskIndex].Parts {
// compute blake2b sum of part.
partPath := filepath.Join(object, part.Name)
hash := newHash(partsMetadata[index].Erasure.Algorithm)
checkSumInfo := partsMetadata[diskIndex].Erasure.GetCheckSumInfo(part.Name)
hash := newHash(checkSumInfo.Algorithm)
blakeBytes, hErr := hashSum(onlineDisk, bucket, partPath, hash)
if hErr == errFileNotFound {
errs[index] = errFileNotFound
availableDisks[index] = nil
errs[diskIndex] = errFileNotFound
availableDisks[diskIndex] = nil
break
}
@ -274,17 +275,16 @@ func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs
return nil, nil, traceError(hErr)
}
partChecksum := partsMetadata[index].Erasure.GetCheckSumInfo(part.Name).Hash
blakeSum := hex.EncodeToString(blakeBytes)
// if blake2b sum doesn't match for a part
// then this disk is outdated and needs
// healing.
if blakeSum != partChecksum {
errs[index] = errFileNotFound
availableDisks[index] = nil
if blakeSum != checkSumInfo.Hash {
errs[diskIndex] = errFileNotFound
availableDisks[diskIndex] = nil
break
}
availableDisks[index] = onlineDisk
availableDisks[diskIndex] = onlineDisk
}
}

@ -330,3 +330,115 @@ func TestListOnlineDisks(t *testing.T) {
}
}
}
func TestDisksWithAllParts(t *testing.T) {
rootPath, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatalf("Failed to initialize config - %v", err)
}
defer removeAll(rootPath)
obj, disks, err := prepareXL()
if err != nil {
t.Fatalf("Prepare XL backend failed - %v", err)
}
defer removeRoots(disks)
bucket := "bucket"
object := "object"
// make data with more than one part
partCount := 3
data := bytes.Repeat([]byte("a"), int(globalPutPartSize)*partCount)
xlDisks := obj.(*xlObjects).storageDisks
err = obj.MakeBucketWithLocation("bucket", "")
if err != nil {
t.Fatalf("Failed to make a bucket %v", err)
}
_, err = obj.PutObject(bucket, object, int64(len(data)),
bytes.NewReader(data), nil, "")
if err != nil {
t.Fatalf("Failed to putObject %v", err)
}
partsMetadata, errs := readAllXLMetadata(xlDisks, bucket, object)
if err != nil {
t.Fatalf("Failed to read xl meta data %v", err)
}
// Replace the default blake2b erasure algorithm to HashSha256 and test that
// disks are excluded
diskFailures := make(map[int]string)
// key = disk index, value = part name with hash mismatch
diskFailures[0] = "part.3"
diskFailures[3] = "part.1"
diskFailures[15] = "part.2"
for diskIndex, partName := range diskFailures {
for index, info := range partsMetadata[diskIndex].Erasure.Checksum {
if info.Name == partName {
partsMetadata[diskIndex].Erasure.Checksum[index].Algorithm = HashSha256
}
}
}
errs = make([]error, len(xlDisks))
filteredDisks, errs, err :=
disksWithAllParts(xlDisks, partsMetadata, errs, bucket, object)
if err != nil {
t.Errorf("Unexpected error: %s", err)
}
if len(filteredDisks) != len(xlDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
}
for diskIndex, disk := range filteredDisks {
if _, ok := diskFailures[diskIndex]; ok {
if disk != nil {
t.Errorf("Disk not filtered as expected, disk: %d", diskIndex)
}
if errs[diskIndex] == nil {
t.Errorf("Expected error not received, diskIndex: %d", diskIndex)
}
} else {
if disk == nil {
t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex)
}
if errs[diskIndex] != nil {
t.Errorf("Unexpected error, %s, diskIndex: %d", errs[diskIndex], diskIndex)
}
}
}
// Test that all disks are returned without any failures with unmodified
// meta data
partsMetadata, errs = readAllXLMetadata(xlDisks, bucket, object)
if err != nil {
t.Fatalf("Failed to read xl meta data %v", err)
}
filteredDisks, errs, err =
disksWithAllParts(xlDisks, partsMetadata, errs, bucket, object)
if err != nil {
t.Errorf("Unexpected error: %s", err)
}
if len(filteredDisks) != len(xlDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
}
for diskIndex, disk := range filteredDisks {
if errs[diskIndex] != nil {
t.Errorf("Unexpected error %s", errs[diskIndex])
}
if disk == nil {
t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex)
}
}
}

Loading…
Cancel
Save