fix: use meta.Erasure.Index for GetObject() to reconstruct object (#10764)

master
Krishna Srinivas 4 years ago committed by GitHub
parent 46275c6547
commit c49a80db41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      cmd/erasure-metadata-utils.go
  2. 3
      cmd/erasure-metadata.go
  3. 3
      cmd/erasure-metadata_test.go
  4. 5
      cmd/erasure-object.go

@ -153,6 +153,30 @@ func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, objec
return metadataArray, g.Wait() return metadataArray, g.Wait()
} }
// Return disks ordered by the meta.Erasure.Index information.
func shuffleDisksByIndex(disks []StorageAPI, metaArr []FileInfo) (shuffledDisks []StorageAPI) {
shuffledDisks = make([]StorageAPI, len(disks))
for i, meta := range metaArr {
if disks[i] == nil {
continue
}
shuffledDisks[meta.Erasure.Index-1] = disks[i]
}
return shuffledDisks
}
// Return FileInfo slice ordered by the meta.Erasure.Index information.
func shufflePartsMetadataByIndex(disks []StorageAPI, metaArr []FileInfo) []FileInfo {
newMetaArr := make([]FileInfo, len(disks))
for i, meta := range metaArr {
if disks[i] == nil {
continue
}
newMetaArr[meta.Erasure.Index-1] = metaArr[i]
}
return newMetaArr
}
// Return shuffled partsMetadata depending on distribution. // Return shuffled partsMetadata depending on distribution.
func shufflePartsMetadata(partsMetadata []FileInfo, distribution []int) (shuffledPartsMetadata []FileInfo) { func shufflePartsMetadata(partsMetadata []FileInfo, distribution []int) (shuffledPartsMetadata []FileInfo) {
if distribution == nil { if distribution == nil {

@ -91,7 +91,8 @@ func (fi FileInfo) IsValid() bool {
dataBlocks := fi.Erasure.DataBlocks dataBlocks := fi.Erasure.DataBlocks
parityBlocks := fi.Erasure.ParityBlocks parityBlocks := fi.Erasure.ParityBlocks
return ((dataBlocks >= parityBlocks) && return ((dataBlocks >= parityBlocks) &&
(dataBlocks != 0) && (parityBlocks != 0)) (dataBlocks != 0) && (parityBlocks != 0) &&
(fi.Erasure.Index > 0 && fi.Erasure.Distribution != nil))
} }
// ToObjectInfo - Converts metadata to object info. // ToObjectInfo - Converts metadata to object info.

@ -47,6 +47,7 @@ func TestAddObjectPart(t *testing.T) {
// Setup. // Setup.
fi := newFileInfo("test-object", 8, 8) fi := newFileInfo("test-object", 8, 8)
fi.Erasure.Index = 1
if !fi.IsValid() { if !fi.IsValid() {
t.Fatalf("unable to get xl meta") t.Fatalf("unable to get xl meta")
} }
@ -80,6 +81,7 @@ func TestObjectPartIndex(t *testing.T) {
// Setup. // Setup.
fi := newFileInfo("test-object", 8, 8) fi := newFileInfo("test-object", 8, 8)
fi.Erasure.Index = 1
if !fi.IsValid() { if !fi.IsValid() {
t.Fatalf("unable to get xl meta") t.Fatalf("unable to get xl meta")
} }
@ -108,6 +110,7 @@ func TestObjectPartIndex(t *testing.T) {
func TestObjectToPartOffset(t *testing.T) { func TestObjectToPartOffset(t *testing.T) {
// Setup. // Setup.
fi := newFileInfo("test-object", 8, 8) fi := newFileInfo("test-object", 8, 8)
fi.Erasure.Index = 1
if !fi.IsValid() { if !fi.IsValid() {
t.Fatalf("unable to get xl meta") t.Fatalf("unable to get xl meta")
} }

@ -215,11 +215,12 @@ func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, s
} }
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error { func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
tmpDisks := onlineDisks
// Reorder online disks based on erasure distribution order. // Reorder online disks based on erasure distribution order.
onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution) onlineDisks = shuffleDisksByIndex(tmpDisks, metaArr)
// Reorder parts metadata based on erasure distribution order. // Reorder parts metadata based on erasure distribution order.
metaArr = shufflePartsMetadata(metaArr, fi.Erasure.Distribution) metaArr = shufflePartsMetadataByIndex(tmpDisks, metaArr)
// For negative length read everything. // For negative length read everything.
if length < 0 { if length < 0 {

Loading…
Cancel
Save