XL: Implement strided erasure distribution. (#1772)

Strided erasure distribution uses a new randomized
block distribution for each Put operation. This
information is captured inside `xl.json` for subsequent
Get operations.
master
Harshavardhana 9 years ago committed by Harshavardhana
parent 6dc8323684
commit d65101a8c8
  1. 6
      erasure-createfile.go
  2. 5
      erasure-readfile.go
  3. 6
      erasure.go
  4. 2
      tree-walk-xl.go
  5. 4
      xl-v1-metadata.go
  6. 5
      xl-v1-multipart.go
  7. 16
      xl-v1-object.go

@ -32,8 +32,7 @@ func (e erasure) cleanupCreateFileOps(volume, path string, writers []io.WriteClo
} }
} }
// WriteErasure reads predefined blocks, encodes them and writes to // WriteErasure reads predefined blocks, encodes them and writes to configured storage disks.
// configured storage disks.
func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) { func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) {
// Release the block writer upon function return. // Release the block writer upon function return.
defer wcloser.release() defer wcloser.release()
@ -119,7 +118,8 @@ func (e erasure) writeErasure(volume, path string, reader *io.PipeReader, wclose
// Write encoded data in routine. // Write encoded data in routine.
go func(index int, writer io.Writer) { go func(index int, writer io.Writer) {
defer wg.Done() defer wg.Done()
encodedData := dataBlocks[index] // Pick the block from the distribution.
encodedData := dataBlocks[e.distribution[index]-1]
_, wErr := writers[index].Write(encodedData) _, wErr := writers[index].Write(encodedData)
if wErr != nil { if wErr != nil {
wErrs[index] = wErr wErrs[index] = wErr

@ -82,13 +82,14 @@ func (e erasure) ReadFile(volume, path string, startOffset int64, totalSize int6
enBlocks := make([][]byte, len(e.storageDisks)) enBlocks := make([][]byte, len(e.storageDisks))
// Read all the readers. // Read all the readers.
for index, reader := range readers { for index, reader := range readers {
blockIndex := e.distribution[index] - 1
// Initialize shard slice and fill the data from each parts. // Initialize shard slice and fill the data from each parts.
enBlocks[index] = make([]byte, curEncBlockSize) enBlocks[blockIndex] = make([]byte, curEncBlockSize)
if reader == nil { if reader == nil {
continue continue
} }
// Read the necessary blocks. // Read the necessary blocks.
_, rErr := io.ReadFull(reader, enBlocks[index]) _, rErr := io.ReadFull(reader, enBlocks[blockIndex])
if rErr != nil && rErr != io.ErrUnexpectedEOF { if rErr != nil && rErr != io.ErrUnexpectedEOF {
readers[index].Close() readers[index].Close()
readers[index] = nil readers[index] = nil

@ -24,10 +24,11 @@ type erasure struct {
DataBlocks int DataBlocks int
ParityBlocks int ParityBlocks int
storageDisks []StorageAPI storageDisks []StorageAPI
distribution []int
} }
// newErasure instantiate a new erasure. // newErasure instantiate a new erasure.
func newErasure(disks []StorageAPI) *erasure { func newErasure(disks []StorageAPI, distribution []int) *erasure {
// Initialize E. // Initialize E.
e := &erasure{} e := &erasure{}
@ -46,6 +47,9 @@ func newErasure(disks []StorageAPI) *erasure {
// Save all the initialized storage disks. // Save all the initialized storage disks.
e.storageDisks = disks e.storageDisks = disks
// Save the distribution.
e.distribution = distribution
// Return successfully initialized. // Return successfully initialized.
return e return e
} }

@ -82,7 +82,7 @@ func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string)
// getRandomDisk - gives a random disk at any point in time from the // getRandomDisk - gives a random disk at any point in time from the
// available pool of disks. // available pool of disks.
func (xl xlObjects) getRandomDisk() (disk StorageAPI) { func (xl xlObjects) getRandomDisk() (disk StorageAPI) {
rand.Seed(time.Now().UTC().UnixNano()) rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
randIndex := rand.Intn(len(xl.storageDisks) - 1) randIndex := rand.Intn(len(xl.storageDisks) - 1)
disk = xl.storageDisks[randIndex] // Pick a random disk. disk = xl.storageDisks[randIndex] // Pick a random disk.
return disk return disk

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"io" "io"
"math/rand"
"path" "path"
"sort" "sort"
"sync" "sync"
@ -254,16 +255,15 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro
// randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm. // randErasureDistribution - uses Knuth Fisher-Yates shuffle algorithm.
func randErasureDistribution(numBlocks int) []int { func randErasureDistribution(numBlocks int) []int {
rand.Seed(time.Now().UTC().UnixNano()) // Seed with current time.
distribution := make([]int, numBlocks) distribution := make([]int, numBlocks)
for i := 0; i < numBlocks; i++ { for i := 0; i < numBlocks; i++ {
distribution[i] = i + 1 distribution[i] = i + 1
} }
/*
for i := 0; i < numBlocks; i++ { for i := 0; i < numBlocks; i++ {
// Choose index uniformly in [i, numBlocks-1] // Choose index uniformly in [i, numBlocks-1]
r := i + rand.Intn(numBlocks-i) r := i + rand.Intn(numBlocks-i)
distribution[r], distribution[i] = distribution[i], distribution[r] distribution[r], distribution[i] = distribution[i], distribution[r]
} }
*/
return distribution return distribution
} }

@ -136,11 +136,14 @@ func (xl xlObjects) putObjectPartCommon(bucket string, object string, uploadID s
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
// Increment version only if we have online disks less than configured storage disks. // Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) { if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++ higherVersion++
} }
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
// Initialize a new erasure with online disks and new distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
partSuffix := fmt.Sprintf("object%d", partID) partSuffix := fmt.Sprintf("object%d", partID)
tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix) tmpPartPath := path.Join(tmpMetaPrefix, bucket, object, uploadID, partSuffix)

@ -41,16 +41,20 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
// List all online disks. // List all online disks.
onlineDisks, _, err := xl.listOnlineDisks(bucket, object) onlineDisks, _, err := xl.listOnlineDisks(bucket, object)
if err != nil { if err != nil {
return nil, toObjectErr(err, bucket, object) return nil, toObjectErr(err, bucket, object)
} }
// For zero byte files, return a null reader. // For zero byte files, return a null reader.
if xlMeta.Stat.Size == 0 { if xlMeta.Stat.Size == 0 {
return nullReadCloser{}, nil return nullReadCloser{}, nil
} }
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
// Initialize a new erasure with online disks, with previous block distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
// Get part index offset. // Get part index offset.
partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset) partIndex, partOffset, err := xlMeta.objectToPartOffset(startOffset)
@ -208,16 +212,22 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1") tempErasureObj := path.Join(tmpMetaPrefix, bucket, object, "object1")
tempObj := path.Join(tmpMetaPrefix, bucket, object) tempObj := path.Join(tmpMetaPrefix, bucket, object)
// Initialize xl meta.
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks)
// List all online disks. // List all online disks.
onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object) onlineDisks, higherVersion, err := xl.listOnlineDisks(bucket, object)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
// Increment version only if we have online disks less than configured storage disks. // Increment version only if we have online disks less than configured storage disks.
if diskCount(onlineDisks) < len(xl.storageDisks) { if diskCount(onlineDisks) < len(xl.storageDisks) {
higherVersion++ higherVersion++
} }
erasure := newErasure(onlineDisks) // Initialize a new erasure with online disks
// Initialize a new erasure with online disks and new distribution.
erasure := newErasure(onlineDisks, xlMeta.Erasure.Distribution)
fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj) fileWriter, err := erasure.CreateFile(minioMetaBucket, tempErasureObj)
if err != nil { if err != nil {
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
@ -301,7 +311,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
return "", toObjectErr(err, bucket, object) return "", toObjectErr(err, bucket, object)
} }
xlMeta := newXLMetaV1(xl.dataBlocks, xl.parityBlocks) // Fill all the necessary metadata.
xlMeta.Meta = metadata xlMeta.Meta = metadata
xlMeta.Stat.Size = size xlMeta.Stat.Size = size
xlMeta.Stat.ModTime = modTime xlMeta.Stat.ModTime = modTime

Loading…
Cancel
Save