XL: pickValidXLMeta should return error instead of panic'ing (#3277)

master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent 0b9f0d14a1
commit eed9ab0464
  1. 7
      cmd/xl-v1-healing.go
  2. 9
      cmd/xl-v1-metadata.go
  3. 60
      cmd/xl-v1-metadata_test.go
  4. 15
      cmd/xl-v1-multipart.go
  5. 5
      cmd/xl-v1-object.go

@ -236,8 +236,11 @@ func healObject(storageDisks []StorageAPI, bucket string, object string) error {
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// List of disks having outdated version of the object or missing object.
outDatedDisks := outDatedDisks(storageDisks, partsMetadata, errs)
// Latest xlMetaV1 for reference.
latestMeta := pickValidXLMeta(partsMetadata, modTime)
// Latest xlMetaV1 for reference. If a valid metadata is not present, it is as good as object not found.
latestMeta, pErr := pickValidXLMeta(partsMetadata, modTime)
if pErr != nil {
return pErr
}
for index, disk := range outDatedDisks {
// Before healing outdated disks, we need to remove xl.json

@ -18,7 +18,7 @@ package cmd
import (
"encoding/json"
"fmt"
"errors"
"path"
"sort"
"sync"
@ -195,15 +195,14 @@ func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset in
// pickValidXLMeta - picks one valid xlMeta content and returns from a
// slice of xlmeta content. If no value is found this function panics
// and dies.
func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) xlMetaV1 {
func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) (xlMetaV1, error) {
// Pick latest valid metadata.
for _, meta := range metaArr {
if meta.IsValid() && meta.Stat.ModTime.Equal(modTime) {
return meta
return meta, nil
}
}
pmsg := fmt.Sprintf("Unable to look for valid XL metadata content - %v %s", metaArr, modTime)
panic(pmsg)
return xlMetaV1{}, traceError(errors.New("No valid xl.json present"))
}
// list of all errors that can be ignored in a metadata operation.

@ -17,8 +17,10 @@
package cmd
import (
"errors"
"strconv"
"testing"
"time"
)
const MiB = 1024 * 1024
@ -149,3 +151,61 @@ func TestObjectToPartOffset(t *testing.T) {
}
}
}
// Helper function to check if two xlMetaV1 values are similar.
func isXLMetaSimilar(m, n xlMetaV1) bool {
if m.Version != n.Version {
return false
}
if m.Format != n.Format {
return false
}
if len(m.Parts) != len(n.Parts) {
return false
}
return true
}
func TestPickValidXLMeta(t *testing.T) {
obj := "object"
x1 := newXLMetaV1(obj, 4, 4)
now := time.Now().UTC()
x1.Stat.ModTime = now
invalidX1 := x1
invalidX1.Version = "invalid-version"
xs := []xlMetaV1{x1, x1, x1, x1}
invalidXS := []xlMetaV1{invalidX1, invalidX1, invalidX1, invalidX1}
testCases := []struct {
metaArr []xlMetaV1
modTime time.Time
xlMeta xlMetaV1
expectedErr error
}{
{
metaArr: xs,
modTime: now,
xlMeta: x1,
expectedErr: nil,
},
{
metaArr: invalidXS,
modTime: now,
xlMeta: invalidX1,
expectedErr: errors.New("No valid xl.json present"),
},
}
for i, test := range testCases {
xlMeta, err := pickValidXLMeta(test.metaArr, test.modTime)
if test.expectedErr != nil {
if errorCause(err).Error() != test.expectedErr.Error() {
t.Errorf("Test %d: Expected to fail with %v but received %v",
i+1, test.expectedErr, err)
}
} else {
if !isXLMetaSimilar(xlMeta, test.xlMeta) {
t.Errorf("Test %d: Expected %v but received %v",
i+1, test.xlMeta, xlMeta)
}
}
}
}

@ -381,7 +381,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata, modTime)
xlMeta, err := pickValidXLMeta(partsMetadata, modTime)
if err != nil {
return "", err
}
onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks)
_ = getOrderedPartsMetadata(xlMeta.Erasure.Distribution, partsMetadata)
@ -493,7 +496,10 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
xlMeta = pickValidXLMeta(partsMetadata, modTime)
xlMeta, err = pickValidXLMeta(partsMetadata, modTime)
if err != nil {
return "", err
}
// Once part is successfully committed, proceed with updating XL metadata.
xlMeta.Stat.ModTime = time.Now().UTC()
@ -684,7 +690,10 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
var objectSize int64
// Pick one from the first valid metadata.
xlMeta := pickValidXLMeta(partsMetadata, modTime)
xlMeta, err := pickValidXLMeta(partsMetadata, modTime)
if err != nil {
return "", err
}
// Order online disks in accordance with distribution order.
onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks)

@ -87,7 +87,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
onlineDisks, modTime := listOnlineDisks(xl.storageDisks, metaArr, errs)
// Pick latest valid metadata.
xlMeta := pickValidXLMeta(metaArr, modTime)
xlMeta, err := pickValidXLMeta(metaArr, modTime)
if err != nil {
return err
}
// Reorder online disks based on erasure distribution order.
onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks)

Loading…
Cancel
Save