xl: GetCheckSumInfo() shouldn't fail if hash not available. (#2984)

In a multipart upload scenario disks going down and coming backup
can lead to certain parts missing on the disk/server which was
going down. This is a valid case since these blocks can be
missing and should be healed through heal operation. But we are
not supposed to fail prematurely since we have enough data on
the other disks as well within read-quorum.

This fix relaxes previous assumption, fixes a major corruption
issue reproduced by @vadmeste.

Fixes #2976
master
Harshavardhana 8 years ago committed by GitHub
parent 6e748cb1cf
commit 39331b6b4e
  1. 5
      cmd/xl-v1-healing.go
  2. 8
      cmd/xl-v1-metadata.go
  3. 8
      cmd/xl-v1-multipart.go
  4. 5
      cmd/xl-v1-object.go
  5. 2
      cmd/xl-v1-utils_test.go

@ -171,10 +171,7 @@ func (xl xlObjects) HealObject(bucket, object string) error {
partName := latestMeta.Parts[partIndex].Name partName := latestMeta.Parts[partIndex].Name
partSize := latestMeta.Parts[partIndex].Size partSize := latestMeta.Parts[partIndex].Size
erasure := latestMeta.Erasure erasure := latestMeta.Erasure
sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName) sumInfo := latestMeta.Erasure.GetCheckSumInfo(partName)
if err != nil {
return err
}
// Heal the part file. // Heal the part file.
checkSums, err := erasureHealFile(latestDisks, outDatedDisks, checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
bucket, pathJoin(object, partName), bucket, pathJoin(object, partName),

@ -81,14 +81,14 @@ func (e *erasureInfo) AddCheckSumInfo(ckSumInfo checkSumInfo) {
} }
// GetCheckSumInfo - get checksum of a part. // GetCheckSumInfo - get checksum of a part.
func (e erasureInfo) GetCheckSumInfo(partName string) (ckSum checkSumInfo, err error) { func (e erasureInfo) GetCheckSumInfo(partName string) (ckSum checkSumInfo) {
// Return the checksum. // Return the checksum.
for _, sum := range e.Checksum { for _, sum := range e.Checksum {
if sum.Name == partName { if sum.Name == partName {
return sum, nil return sum
} }
} }
return checkSumInfo{}, traceError(errUnexpected) return checkSumInfo{Algorithm: bitRotAlgo}
} }
// statInfo - carries stat information of the object. // statInfo - carries stat information of the object.
@ -197,7 +197,7 @@ func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset in
func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) xlMetaV1 { func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) xlMetaV1 {
// Pick latest valid metadata. // Pick latest valid metadata.
for _, meta := range metaArr { for _, meta := range metaArr {
if meta.IsValid() && meta.Stat.ModTime == modTime { if meta.IsValid() && meta.Stat.ModTime.Equal(modTime) {
return meta return meta
} }
} }

@ -386,8 +386,6 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
tmpSuffix := getUUID() tmpSuffix := getUUID()
tmpPartPath := path.Join(tmpMetaPrefix, tmpSuffix) tmpPartPath := path.Join(tmpMetaPrefix, tmpSuffix)
lreader := data
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
@ -401,12 +399,16 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
mw := io.MultiWriter(writers...) mw := io.MultiWriter(writers...)
var lreader io.Reader
// Limit the reader to its provided size > 0. // Limit the reader to its provided size > 0.
if size > 0 { if size > 0 {
// This is done so that we can avoid erroneous clients sending // This is done so that we can avoid erroneous clients sending
// more data than the set content size. // more data than the set content size.
lreader = io.LimitReader(data, size) lreader = io.LimitReader(data, size)
} // else we read till EOF. } else {
// else we read till EOF.
lreader = data
}
// Construct a tee reader for md5sum. // Construct a tee reader for md5sum.
teeReader := io.TeeReader(lreader, mw) teeReader := io.TeeReader(lreader, mw)

@ -187,10 +187,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
checkSums[index] = "" checkSums[index] = ""
continue continue
} }
ckSumInfo, err := metaArr[index].Erasure.GetCheckSumInfo(partName) ckSumInfo := metaArr[index].Erasure.GetCheckSumInfo(partName)
if err != nil { // FIXME - relook at returning error here.
return toObjectErr(err, bucket, object)
}
checkSums[index] = ckSumInfo.Hash checkSums[index] = ckSumInfo.Hash
// Set checksum algo only once, while it is possible to have // Set checksum algo only once, while it is possible to have
// different algos per block because of our `xl.json`. // different algos per block because of our `xl.json`.

@ -209,7 +209,7 @@ func compareXLMetaV1(t *testing.T, unMarshalXLMeta, gjsonXLMeta xlMetaV1) {
if unMarshalXLMeta.Stat.Size != gjsonXLMeta.Stat.Size { if unMarshalXLMeta.Stat.Size != gjsonXLMeta.Stat.Size {
t.Errorf("Expected the stat size to be %v, but got %v.", unMarshalXLMeta.Stat.Size, gjsonXLMeta.Stat.Size) t.Errorf("Expected the stat size to be %v, but got %v.", unMarshalXLMeta.Stat.Size, gjsonXLMeta.Stat.Size)
} }
if unMarshalXLMeta.Stat.ModTime != gjsonXLMeta.Stat.ModTime { if !unMarshalXLMeta.Stat.ModTime.Equal(gjsonXLMeta.Stat.ModTime) {
t.Errorf("Expected the modTime to be \"%v\", but got \"%v\".", unMarshalXLMeta.Stat.ModTime, gjsonXLMeta.Stat.ModTime) t.Errorf("Expected the modTime to be \"%v\", but got \"%v\".", unMarshalXLMeta.Stat.ModTime, gjsonXLMeta.Stat.ModTime)
} }
if unMarshalXLMeta.Erasure.Algorithm != gjsonXLMeta.Erasure.Algorithm { if unMarshalXLMeta.Erasure.Algorithm != gjsonXLMeta.Erasure.Algorithm {

Loading…
Cancel
Save