xl: code refactor, cleanup ReadFile and CreateFile.

master
Krishna Srinivas 9 years ago committed by Harshavardhana
parent 45b3d3e21f
commit 5c33b68318
  1. 143
      xl-v1-createfile.go
  2. 42
      xl-v1-healfile.go
  3. 187
      xl-v1-readfile.go
  4. 76
      xl-v1-utils.go

@ -48,18 +48,82 @@ func closeAndRemoveWriters(writers ...io.WriteCloser) {
}
}
type quorumDisk struct {
disk StorageAPI
index int
}
// getQuorumDisks - get the current quorum disks.
func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) {
fileQuorumVersionMap := xl.getFileQuorumVersionMap(volume, path)
for diskIndex, formatVersion := range fileQuorumVersionMap {
if formatVersion > higherVersion {
higherVersion = formatVersion
quorumDisks = []quorumDisk{{
disk: xl.storageDisks[diskIndex],
index: diskIndex,
}}
} else if formatVersion == higherVersion {
quorumDisks = append(quorumDisks, quorumDisk{
disk: xl.storageDisks[diskIndex],
index: diskIndex,
})
}
}
return quorumDisks, higherVersion
}
func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 {
metadataFilePath := slashpath.Join(path, metadataFile)
// Set offset to 0 to read entire file.
offset := int64(0)
metadata := make(map[string]string)
// Allocate disk index format map - do not use maps directly
// without allocating.
fileQuorumVersionMap := make(map[int]int64)
// TODO - all errors should be logged here.
// Read meta data from all disks
for index, disk := range xl.storageDisks {
fileQuorumVersionMap[index] = -1
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil {
continue
} else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
continue
} else if _, ok := metadata["file.version"]; !ok {
fileQuorumVersionMap[index] = 0
}
// Convert string to integer.
fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64)
if err != nil {
continue
}
fileQuorumVersionMap[index] = fileVersion
}
return fileQuorumVersionMap
}
// WriteErasure reads predefined blocks, encodes them and writes to
// configured storage disks.
func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
xl.lockNS(volume, path, false)
defer xl.unlockNS(volume, path, false)
// get available quorum for existing file path
// Get available quorum for existing file path.
_, higherVersion := xl.getQuorumDisks(volume, path)
// increment to have next higher version
// Increment to have next higher version.
higherVersion++
quorumDisks := make([]quorumDisk, len(xl.storageDisks))
writers := make([]io.WriteCloser, len(xl.storageDisks))
sha512Writers := make([]hash.Hash, len(xl.storageDisks))
@ -70,15 +134,14 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
modTime := time.Now().UTC()
createFileError := 0
maxIndex := 0
for index, disk := range xl.storageDisks {
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
writer, err := disk.CreateFile(volume, erasurePart)
if err != nil {
createFileError++
// we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure
// We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure.
if createFileError <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
@ -95,8 +158,8 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
if err != nil {
createFileError++
// we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure
// We can safely allow CreateFile errors up to
// len(xl.storageDisks) - xl.writeQuorum otherwise return failure.
if createFileError <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
@ -107,19 +170,17 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
return
}
writers[maxIndex] = writer
metadataWriters[maxIndex] = metadataWriter
sha512Writers[maxIndex] = fastSha512.New()
quorumDisks[maxIndex] = quorumDisk{disk, index}
maxIndex++
writers[index] = writer
metadataWriters[index] = metadataWriter
sha512Writers[index] = fastSha512.New()
}
// Allocate 4MiB block size buffer for reading.
buffer := make([]byte, erasureBlockSize)
dataBuffer := make([]byte, erasureBlockSize)
var totalSize int64 // Saves total incoming stream size.
for {
// Read up to allocated block size.
n, err := io.ReadFull(reader, buffer)
n, err := io.ReadFull(reader, dataBuffer)
if err != nil {
// Any unexpected errors, close the pipe reader with error.
if err != io.ErrUnexpectedEOF && err != io.EOF {
@ -135,16 +196,17 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
}
if n > 0 {
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = xl.ReedSolomon.Split(buffer[0:n])
var dataBlocks [][]byte
dataBlocks, err = xl.ReedSolomon.Split(dataBuffer[0:n])
if err != nil {
// Remove all temp writers.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err)
return
}
// Encode parity blocks using data blocks.
err = xl.ReedSolomon.Encode(blocks)
err = xl.ReedSolomon.Encode(dataBlocks)
if err != nil {
// Remove all temp writers upon error.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
@ -153,18 +215,23 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
}
// Loop through and write encoded data to quorum disks.
for i := 0; i < maxIndex; i++ {
encodedData := blocks[quorumDisks[i].index]
_, err = writers[i].Write(encodedData)
for index, writer := range writers {
if writer == nil {
continue
}
encodedData := dataBlocks[index]
_, err = writers[index].Write(encodedData)
if err != nil {
// Remove all temp writers upon error.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err)
return
}
sha512Writers[i].Write(encodedData)
if sha512Writers[index] != nil {
sha512Writers[index].Write(encodedData)
}
}
// Update total written.
totalSize += int64(n)
}
@ -178,7 +245,8 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
metadata["format.patch"] = "0"
metadata["file.size"] = strconv.FormatInt(totalSize, 10)
if len(xl.storageDisks) > len(writers) {
// save file.version only if we wrote to less disks than all disks
// Save file.version only if we wrote to less disks than all
// storage disks.
metadata["file.version"] = strconv.FormatInt(higherVersion, 10)
}
metadata["file.modTime"] = modTime.Format(timeFormatAMZ)
@ -191,9 +259,14 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
// Case: when storageDisks is 16 and write quorumDisks is 13,
// meta data write failure up to 2 can be considered.
// currently we fail for any meta data writes
for i := 0; i < maxIndex; i++ {
// Save sha512 checksum of each encoded blocks.
metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[i].Sum(nil))
for index, metadataWriter := range metadataWriters {
if metadataWriter == nil {
continue
}
if sha512Writers[index] != nil {
// Save sha512 checksum of each encoded blocks.
metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[index].Sum(nil))
}
// Marshal metadata into json strings.
metadataBytes, err := json.Marshal(metadata)
@ -205,7 +278,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
}
// Write metadata to disk.
_, err = metadataWriters[i].Write(metadataBytes)
_, err = metadataWriter.Write(metadataBytes)
if err != nil {
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err)
@ -214,10 +287,18 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
}
// Close all writers and metadata writers in routines.
for i := 0; i < maxIndex; i++ {
for index, writer := range writers {
if writer == nil {
continue
}
// Safely wrote, now rename to its actual location.
writer.Close()
if metadataWriters[index] == nil {
continue
}
// Safely wrote, now rename to its actual location.
writers[i].Close()
metadataWriters[i].Close()
metadataWriters[index].Close()
}
// Close the pipe reader and return.

@ -26,11 +26,11 @@ import (
)
func (xl XL) selfHeal(volume string, path string) error {
totalShards := xl.DataBlocks + xl.ParityBlocks
needsSelfHeal := make([]bool, totalShards)
totalBlocks := xl.DataBlocks + xl.ParityBlocks
needsSelfHeal := make([]bool, totalBlocks)
var metadata = make(map[string]string)
var readers = make([]io.Reader, totalShards)
var writers = make([]io.WriteCloser, totalShards)
var readers = make([]io.Reader, totalBlocks)
var writers = make([]io.WriteCloser, totalBlocks)
for index, disk := range xl.storageDisks {
metadataFile := slashpath.Join(path, metadataFile)
@ -108,59 +108,59 @@ func (xl XL) selfHeal(volume string, path string) error {
} else {
curBlockSize = int(totalLeft)
}
// Calculate the current shard size.
curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks)
enShards := make([][]byte, totalShards)
// Calculate the current block size.
curBlockSize = getEncodedBlockLen(curBlockSize, xl.DataBlocks)
enBlocks := make([][]byte, totalBlocks)
// Loop through all readers and read.
for index, reader := range readers {
// Initialize shard slice and fill the data from each parts.
// Initialize block slice and fill the data from each parts.
// ReedSolomon.Verify() expects that slice is not nil even if the particular
// part needs healing.
enShards[index] = make([]byte, curShardSize)
enBlocks[index] = make([]byte, curBlockSize)
if needsSelfHeal[index] {
// Skip reading if the part needs healing.
continue
}
_, e := io.ReadFull(reader, enShards[index])
_, e := io.ReadFull(reader, enBlocks[index])
if e != nil && e != io.ErrUnexpectedEOF {
enShards[index] = nil
enBlocks[index] = nil
}
}
// Check blocks if they are all zero in length.
if checkBlockSize(enShards) == 0 {
if checkBlockSize(enBlocks) == 0 {
err = errors.New("Data likely corrupted, all blocks are zero in length.")
return err
}
// Verify the shards.
ok, e := xl.ReedSolomon.Verify(enShards)
// Verify the blocks.
ok, e := xl.ReedSolomon.Verify(enBlocks)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
// Verification failed, shards require reconstruction.
// Verification failed, blocks require reconstruction.
if !ok {
for index, shNeeded := range needsSelfHeal {
if shNeeded {
// Reconstructs() reconstructs the parts if the array is nil.
enShards[index] = nil
enBlocks[index] = nil
}
}
e = xl.ReedSolomon.Reconstruct(enShards)
e = xl.ReedSolomon.Reconstruct(enBlocks)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
// Verify reconstructed shards again.
ok, e = xl.ReedSolomon.Verify(enShards)
// Verify reconstructed blocks again.
ok, e = xl.ReedSolomon.Verify(enBlocks)
if e != nil {
closeAndRemoveWriters(writers...)
return e
}
if !ok {
// Shards cannot be reconstructed, corrupted data.
// Blocks cannot be reconstructed, corrupted data.
e = errors.New("Verification failed after reconstruction, data likely corrupted.")
closeAndRemoveWriters(writers...)
return e
@ -170,7 +170,7 @@ func (xl XL) selfHeal(volume string, path string) error {
if !shNeeded {
continue
}
_, e := writers[index].Write(enShards[index])
_, e := writers[index].Write(enBlocks[index])
if e != nil {
closeAndRemoveWriters(writers...)
return e

@ -17,7 +17,6 @@
package main
import (
"encoding/json"
"errors"
"fmt"
"io"
@ -44,85 +43,66 @@ func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
return
}
func (xl XL) getMetaFileVersionMap(volume, path string) (diskFileVersionMap map[int]int64) {
metadataFilePath := slashpath.Join(path, metadataFile)
// Set offset to 0 to read entire file.
offset := int64(0)
metadata := make(map[string]string)
// Allocate disk index format map - do not use maps directly without allocating.
diskFileVersionMap = make(map[int]int64)
// TODO - all errors should be logged here.
// Read meta data from all disks
for index, disk := range xl.storageDisks {
diskFileVersionMap[index] = -1
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil {
continue
} else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
continue
} else if _, ok := metadata["file.version"]; !ok {
diskFileVersionMap[index] = 0
}
// Convert string to integer.
fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64)
if err != nil {
continue
// Returns slice of disks needed for ReadFile operation:
// - slice returing readable disks.
// - file size
// - error if any.
func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error) {
partsMetadata, errs := xl.getPartsMetadata(volume, path)
highestVersion := int64(0)
versions := make([]int64, len(xl.storageDisks))
quorumDisks := make([]StorageAPI, len(xl.storageDisks))
fileSize := int64(0)
for index, metadata := range partsMetadata {
if errs[index] == nil {
if versionStr, ok := metadata["file.version"]; ok {
// Convert string to integer.
version, err := strconv.ParseInt(versionStr, 10, 64)
if err != nil {
// Unexpected, return error.
return nil, 0, err
}
if version > highestVersion {
highestVersion = version
}
versions[index] = version
} else {
versions[index] = 0
}
} else {
versions[index] = -1
}
diskFileVersionMap[index] = fileVersion
}
return diskFileVersionMap
}
type quorumDisk struct {
disk StorageAPI
index int
}
// getQuorumDisks - get the current quorum disks.
func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) {
diskVersionMap := xl.getMetaFileVersionMap(volume, path)
for diskIndex, formatVersion := range diskVersionMap {
if formatVersion > higherVersion {
higherVersion = formatVersion
quorumDisks = []quorumDisk{{
disk: xl.storageDisks[diskIndex],
index: diskIndex,
}}
} else if formatVersion == higherVersion {
quorumDisks = append(quorumDisks, quorumDisk{
disk: xl.storageDisks[diskIndex],
index: diskIndex,
})
quorumCount := 0
for index, version := range versions {
if version == highestVersion {
quorumDisks[index] = xl.storageDisks[index]
quorumCount++
} else {
quorumDisks[index] = nil
}
}
return
}
// getFileSize - extract file size from metadata.
func (xl XL) getFileSize(volume, path string, disk StorageAPI) (size int64, err error) {
metadataFilePath := slashpath.Join(path, metadataFile)
// set offset to 0 to read entire file
offset := int64(0)
metadata := make(map[string]string)
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil {
return 0, err
}
if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
return 0, err
if quorumCount < xl.readQuorum {
return nil, 0, errReadQuorum
}
if _, ok := metadata["file.size"]; !ok {
return 0, errors.New("missing 'file.size' in meta data")
for index, disk := range quorumDisks {
if disk == nil {
continue
}
if sizeStr, ok := partsMetadata[index]["file.size"]; ok {
var err error
fileSize, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return nil, 0, err
}
break
} else {
return nil, 0, errors.New("Missing 'file.size' in meta data.")
}
}
return strconv.ParseInt(metadata["file.size"], 10, 64)
return quorumDisks, fileSize, nil
}
// ReadFile - read file
@ -140,43 +120,23 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
xl.lockNS(volume, path, readLock)
defer xl.unlockNS(volume, path, readLock)
// Check read quorum.
quorumDisks, _ := xl.getQuorumDisks(volume, path)
if len(quorumDisks) < xl.readQuorum {
return nil, errReadQuorum
}
// Get file size.
fileSize, err := xl.getFileSize(volume, path, quorumDisks[0].disk)
quorumDisks, fileSize, err := xl.getReadableDisks(volume, path)
if err != nil {
return nil, err
}
totalBlocks := xl.DataBlocks + xl.ParityBlocks // Total blocks.
readers := make([]io.ReadCloser, len(quorumDisks))
readFileError := 0
for _, quorumDisk := range quorumDisks {
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", quorumDisk.index))
var erasuredPartReader io.ReadCloser
if erasuredPartReader, err = quorumDisk.disk.ReadFile(volume, erasurePart, offset); err != nil {
// We can safely allow ReadFile errors up to len(quorumDisks) - xl.readQuorum
// otherwise return failure
if readFileError < len(quorumDisks)-xl.readQuorum {
// Set the reader to 'nil' to be able to reconstruct later.
readers[quorumDisk.index] = nil
readFileError++
continue
}
// Control reaches here we do not have quorum
// anymore. Close all the readers.
for _, reader := range readers {
if reader != nil {
reader.Close()
}
}
return nil, errReadQuorum
readers := make([]io.ReadCloser, len(xl.storageDisks))
for index, disk := range quorumDisks {
if disk == nil {
continue
}
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
// If disk.ReadFile returns error and we don't have read quorum it will be taken care as
// ReedSolomon.Reconstruct() will fail later.
var reader io.ReadCloser
if reader, err = disk.ReadFile(volume, erasurePart, offset); err == nil {
readers[index] = reader
}
readers[quorumDisk.index] = erasuredPartReader
}
// Initialize pipe.
@ -194,19 +154,17 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
}
// Calculate the current encoded block size.
curEncBlockSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks)
enBlocks := make([][]byte, totalBlocks)
enBlocks := make([][]byte, len(xl.storageDisks))
// Loop through all readers and read.
for index, reader := range readers {
// Initialize shard slice and fill the data from each parts.
enBlocks[index] = make([]byte, curEncBlockSize)
if reader == nil {
// One of files missing, save it for reconstruction.
enBlocks[index] = nil
continue
}
// Initialize shard slice and fill the data from each parts.
enBlocks[index] = make([]byte, curEncBlockSize)
_, err = io.ReadFull(reader, enBlocks[index])
if err != nil && err != io.ErrUnexpectedEOF {
enBlocks[index] = nil
readers[index] = nil
}
}
@ -229,6 +187,12 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// Verification failed, blocks require reconstruction.
if !ok {
for index, reader := range readers {
if reader == nil {
// Reconstruct expects missing blocks to be nil.
enBlocks[index] = nil
}
}
err = xl.ReedSolomon.Reconstruct(enBlocks)
if err != nil {
pipeWriter.CloseWithError(err)
@ -264,6 +228,9 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// Cleanly close all the underlying data readers.
for _, reader := range readers {
if reader == nil {
continue
}
reader.Close()
}
}()

@ -0,0 +1,76 @@
package main
import (
"encoding/json"
"errors"
slashpath "path"
"path/filepath"
)
// Get parts.json metadata as a map slice.
// Returns error slice indicating the failed metadata reads.
func (xl XL) getPartsMetadata(volume, path string) ([]map[string]string, []error) {
errs := make([]error, len(xl.storageDisks))
metadataArray := make([]map[string]string, len(xl.storageDisks))
metadataFilePath := slashpath.Join(path, metadataFile)
for index, disk := range xl.storageDisks {
metadata := make(map[string]string)
offset := int64(0)
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
if err != nil {
errs[index] = err
continue
}
defer metadataReader.Close()
decoder := json.NewDecoder(metadataReader)
if err = decoder.Decode(&metadata); err != nil {
// Unable to parse parts.json, set error.
errs[index] = err
continue
}
metadataArray[index] = metadata
}
return metadataArray, errs
}
// Writes/Updates `parts.json` for given file. updateParts carries
// index of disks where `parts.json` needs to be updated.
//
// Returns collection of errors, indexed in accordance with input
// updateParts order.
func (xl XL) setPartsMetadata(volume, path string, metadata map[string]string, updateParts []bool) []error {
metadataFilePath := filepath.Join(path, metadataFile)
errs := make([]error, len(xl.storageDisks))
for index := range updateParts {
errs[index] = errors.New("metadata not updated")
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
for index := range updateParts {
errs[index] = err
}
return errs
}
for index, shouldUpdate := range updateParts {
if !shouldUpdate {
continue
}
writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath)
errs[index] = err
if err != nil {
continue
}
_, err = writer.Write(metadataBytes)
if err != nil {
errs[index] = err
safeCloseAndRemove(writer)
continue
}
writer.Close()
}
return errs
}
Loading…
Cancel
Save