From dce76d9307ef12749a10a5171fc67e0af09e3a1f Mon Sep 17 00:00:00 2001 From: Daniel Lind Date: Thu, 15 Jun 2017 02:13:02 +0200 Subject: [PATCH] Fix xl.diskWithAllParts to proper checksum algorithm (#4509) --- cmd/xl-v1-healing-common.go | 20 +++--- cmd/xl-v1-healing-common_test.go | 112 +++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 10 deletions(-) diff --git a/cmd/xl-v1-healing-common.go b/cmd/xl-v1-healing-common.go index f463ec6c1..54beedcc9 100644 --- a/cmd/xl-v1-healing-common.go +++ b/cmd/xl-v1-healing-common.go @@ -252,21 +252,22 @@ func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealObject // calculating blake2b checksum. func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, object string) ([]StorageAPI, []error, error) { availableDisks := make([]StorageAPI, len(onlineDisks)) - for index, onlineDisk := range onlineDisks { + for diskIndex, onlineDisk := range onlineDisks { if onlineDisk == nil { continue } // disk has a valid xl.json but may not have all the // parts. This is considered an outdated disk, since // it needs healing too. - for _, part := range partsMetadata[index].Parts { + for _, part := range partsMetadata[diskIndex].Parts { // compute blake2b sum of part. partPath := filepath.Join(object, part.Name) - hash := newHash(partsMetadata[index].Erasure.Algorithm) + checkSumInfo := partsMetadata[diskIndex].Erasure.GetCheckSumInfo(part.Name) + hash := newHash(checkSumInfo.Algorithm) blakeBytes, hErr := hashSum(onlineDisk, bucket, partPath, hash) if hErr == errFileNotFound { - errs[index] = errFileNotFound - availableDisks[index] = nil + errs[diskIndex] = errFileNotFound + availableDisks[diskIndex] = nil break } @@ -274,17 +275,16 @@ func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs return nil, nil, traceError(hErr) } - partChecksum := partsMetadata[index].Erasure.GetCheckSumInfo(part.Name).Hash blakeSum := hex.EncodeToString(blakeBytes) // if blake2b sum doesn't match for a part // then this disk is outdated and needs // healing. - if blakeSum != partChecksum { - errs[index] = errFileNotFound - availableDisks[index] = nil + if blakeSum != checkSumInfo.Hash { + errs[diskIndex] = errFileNotFound + availableDisks[diskIndex] = nil break } - availableDisks[index] = onlineDisk + availableDisks[diskIndex] = onlineDisk } } diff --git a/cmd/xl-v1-healing-common_test.go b/cmd/xl-v1-healing-common_test.go index d9705395c..351d3a028 100644 --- a/cmd/xl-v1-healing-common_test.go +++ b/cmd/xl-v1-healing-common_test.go @@ -330,3 +330,115 @@ func TestListOnlineDisks(t *testing.T) { } } } + +func TestDisksWithAllParts(t *testing.T) { + rootPath, err := newTestConfig(globalMinioDefaultRegion) + if err != nil { + t.Fatalf("Failed to initialize config - %v", err) + } + defer removeAll(rootPath) + + obj, disks, err := prepareXL() + if err != nil { + t.Fatalf("Prepare XL backend failed - %v", err) + } + defer removeRoots(disks) + + bucket := "bucket" + object := "object" + // make data with more than one part + partCount := 3 + data := bytes.Repeat([]byte("a"), int(globalPutPartSize)*partCount) + xlDisks := obj.(*xlObjects).storageDisks + + err = obj.MakeBucketWithLocation("bucket", "") + if err != nil { + t.Fatalf("Failed to make a bucket %v", err) + } + + _, err = obj.PutObject(bucket, object, int64(len(data)), + bytes.NewReader(data), nil, "") + if err != nil { + t.Fatalf("Failed to putObject %v", err) + } + + partsMetadata, errs := readAllXLMetadata(xlDisks, bucket, object) + if err != nil { + t.Fatalf("Failed to read xl meta data %v", err) + } + + // Replace the default blake2b erasure algorithm to HashSha256 and test that + // disks are excluded + diskFailures := make(map[int]string) + // key = disk index, value = part name with hash mismatch + diskFailures[0] = "part.3" + diskFailures[3] = "part.1" + diskFailures[15] = "part.2" + + for diskIndex, partName := range diskFailures { + for index, info := range partsMetadata[diskIndex].Erasure.Checksum { + if info.Name == partName { + partsMetadata[diskIndex].Erasure.Checksum[index].Algorithm = HashSha256 + } + } + } + + errs = make([]error, len(xlDisks)) + filteredDisks, errs, err := + disksWithAllParts(xlDisks, partsMetadata, errs, bucket, object) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + + if len(filteredDisks) != len(xlDisks) { + t.Errorf("Unexpected number of disks: %d", len(filteredDisks)) + } + + for diskIndex, disk := range filteredDisks { + + if _, ok := diskFailures[diskIndex]; ok { + if disk != nil { + t.Errorf("Disk not filtered as expected, disk: %d", diskIndex) + } + if errs[diskIndex] == nil { + t.Errorf("Expected error not received, diskIndex: %d", diskIndex) + } + } else { + if disk == nil { + t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex) + } + if errs[diskIndex] != nil { + t.Errorf("Unexpected error, %s, diskIndex: %d", errs[diskIndex], diskIndex) + } + + } + } + + // Test that all disks are returned without any failures with unmodified + // meta data + partsMetadata, errs = readAllXLMetadata(xlDisks, bucket, object) + if err != nil { + t.Fatalf("Failed to read xl meta data %v", err) + } + + filteredDisks, errs, err = + disksWithAllParts(xlDisks, partsMetadata, errs, bucket, object) + if err != nil { + t.Errorf("Unexpected error: %s", err) + } + + if len(filteredDisks) != len(xlDisks) { + t.Errorf("Unexpected number of disks: %d", len(filteredDisks)) + } + + for diskIndex, disk := range filteredDisks { + if errs[diskIndex] != nil { + t.Errorf("Unexpected error %s", errs[diskIndex]) + } + + if disk == nil { + t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex) + } + } + +}