diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index ed5767218..01126c936 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -48,18 +48,82 @@ func closeAndRemoveWriters(writers ...io.WriteCloser) { } } +type quorumDisk struct { + disk StorageAPI + index int +} + +// getQuorumDisks - get the current quorum disks. +func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) { + fileQuorumVersionMap := xl.getFileQuorumVersionMap(volume, path) + for diskIndex, formatVersion := range fileQuorumVersionMap { + if formatVersion > higherVersion { + higherVersion = formatVersion + quorumDisks = []quorumDisk{{ + disk: xl.storageDisks[diskIndex], + index: diskIndex, + }} + + } else if formatVersion == higherVersion { + quorumDisks = append(quorumDisks, quorumDisk{ + disk: xl.storageDisks[diskIndex], + index: diskIndex, + }) + + } + } + return quorumDisks, higherVersion +} + +func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 { + metadataFilePath := slashpath.Join(path, metadataFile) + // Set offset to 0 to read entire file. + offset := int64(0) + metadata := make(map[string]string) + + // Allocate disk index format map - do not use maps directly + // without allocating. + fileQuorumVersionMap := make(map[int]int64) + + // TODO - all errors should be logged here. + + // Read meta data from all disks + for index, disk := range xl.storageDisks { + fileQuorumVersionMap[index] = -1 + + metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) + if err != nil { + continue + + } else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { + continue + + } else if _, ok := metadata["file.version"]; !ok { + fileQuorumVersionMap[index] = 0 + + } + // Convert string to integer. + fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64) + if err != nil { + continue + + } + fileQuorumVersionMap[index] = fileVersion + } + return fileQuorumVersionMap +} + // WriteErasure reads predefined blocks, encodes them and writes to // configured storage disks. func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { xl.lockNS(volume, path, false) defer xl.unlockNS(volume, path, false) - // get available quorum for existing file path + // Get available quorum for existing file path. _, higherVersion := xl.getQuorumDisks(volume, path) - // increment to have next higher version + // Increment to have next higher version. higherVersion++ - quorumDisks := make([]quorumDisk, len(xl.storageDisks)) writers := make([]io.WriteCloser, len(xl.storageDisks)) sha512Writers := make([]hash.Hash, len(xl.storageDisks)) @@ -70,15 +134,14 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { modTime := time.Now().UTC() createFileError := 0 - maxIndex := 0 for index, disk := range xl.storageDisks { erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) writer, err := disk.CreateFile(volume, erasurePart) if err != nil { createFileError++ - // we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum - // otherwise return failure + // We can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum + // otherwise return failure. if createFileError <= len(xl.storageDisks)-xl.writeQuorum { continue } @@ -95,8 +158,8 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { if err != nil { createFileError++ - // we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum - // otherwise return failure + // We can safely allow CreateFile errors up to + // len(xl.storageDisks) - xl.writeQuorum otherwise return failure. if createFileError <= len(xl.storageDisks)-xl.writeQuorum { continue } @@ -107,19 +170,17 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { return } - writers[maxIndex] = writer - metadataWriters[maxIndex] = metadataWriter - sha512Writers[maxIndex] = fastSha512.New() - quorumDisks[maxIndex] = quorumDisk{disk, index} - maxIndex++ + writers[index] = writer + metadataWriters[index] = metadataWriter + sha512Writers[index] = fastSha512.New() } // Allocate 4MiB block size buffer for reading. - buffer := make([]byte, erasureBlockSize) + dataBuffer := make([]byte, erasureBlockSize) var totalSize int64 // Saves total incoming stream size. for { // Read up to allocated block size. - n, err := io.ReadFull(reader, buffer) + n, err := io.ReadFull(reader, dataBuffer) if err != nil { // Any unexpected errors, close the pipe reader with error. if err != io.ErrUnexpectedEOF && err != io.EOF { @@ -135,16 +196,17 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } if n > 0 { // Split the input buffer into data and parity blocks. - var blocks [][]byte - blocks, err = xl.ReedSolomon.Split(buffer[0:n]) + var dataBlocks [][]byte + dataBlocks, err = xl.ReedSolomon.Split(dataBuffer[0:n]) if err != nil { // Remove all temp writers. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } + // Encode parity blocks using data blocks. - err = xl.ReedSolomon.Encode(blocks) + err = xl.ReedSolomon.Encode(dataBlocks) if err != nil { // Remove all temp writers upon error. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) @@ -153,18 +215,23 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } // Loop through and write encoded data to quorum disks. - for i := 0; i < maxIndex; i++ { - encodedData := blocks[quorumDisks[i].index] - - _, err = writers[i].Write(encodedData) + for index, writer := range writers { + if writer == nil { + continue + } + encodedData := dataBlocks[index] + _, err = writers[index].Write(encodedData) if err != nil { // Remove all temp writers upon error. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } - sha512Writers[i].Write(encodedData) + if sha512Writers[index] != nil { + sha512Writers[index].Write(encodedData) + } } + // Update total written. totalSize += int64(n) } @@ -178,7 +245,8 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { metadata["format.patch"] = "0" metadata["file.size"] = strconv.FormatInt(totalSize, 10) if len(xl.storageDisks) > len(writers) { - // save file.version only if we wrote to less disks than all disks + // Save file.version only if we wrote to less disks than all + // storage disks. metadata["file.version"] = strconv.FormatInt(higherVersion, 10) } metadata["file.modTime"] = modTime.Format(timeFormatAMZ) @@ -191,9 +259,14 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { // Case: when storageDisks is 16 and write quorumDisks is 13, // meta data write failure up to 2 can be considered. // currently we fail for any meta data writes - for i := 0; i < maxIndex; i++ { - // Save sha512 checksum of each encoded blocks. - metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[i].Sum(nil)) + for index, metadataWriter := range metadataWriters { + if metadataWriter == nil { + continue + } + if sha512Writers[index] != nil { + // Save sha512 checksum of each encoded blocks. + metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[index].Sum(nil)) + } // Marshal metadata into json strings. metadataBytes, err := json.Marshal(metadata) @@ -205,7 +278,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } // Write metadata to disk. - _, err = metadataWriters[i].Write(metadataBytes) + _, err = metadataWriter.Write(metadataBytes) if err != nil { xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -214,10 +287,18 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } // Close all writers and metadata writers in routines. - for i := 0; i < maxIndex; i++ { + for index, writer := range writers { + if writer == nil { + continue + } + // Safely wrote, now rename to its actual location. + writer.Close() + + if metadataWriters[index] == nil { + continue + } // Safely wrote, now rename to its actual location. - writers[i].Close() - metadataWriters[i].Close() + metadataWriters[index].Close() } // Close the pipe reader and return. diff --git a/xl-v1-healfile.go b/xl-v1-healfile.go index 782cb441a..7951bea1f 100644 --- a/xl-v1-healfile.go +++ b/xl-v1-healfile.go @@ -26,11 +26,11 @@ import ( ) func (xl XL) selfHeal(volume string, path string) error { - totalShards := xl.DataBlocks + xl.ParityBlocks - needsSelfHeal := make([]bool, totalShards) + totalBlocks := xl.DataBlocks + xl.ParityBlocks + needsSelfHeal := make([]bool, totalBlocks) var metadata = make(map[string]string) - var readers = make([]io.Reader, totalShards) - var writers = make([]io.WriteCloser, totalShards) + var readers = make([]io.Reader, totalBlocks) + var writers = make([]io.WriteCloser, totalBlocks) for index, disk := range xl.storageDisks { metadataFile := slashpath.Join(path, metadataFile) @@ -108,59 +108,59 @@ func (xl XL) selfHeal(volume string, path string) error { } else { curBlockSize = int(totalLeft) } - // Calculate the current shard size. - curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks) - enShards := make([][]byte, totalShards) + // Calculate the current block size. + curBlockSize = getEncodedBlockLen(curBlockSize, xl.DataBlocks) + enBlocks := make([][]byte, totalBlocks) // Loop through all readers and read. for index, reader := range readers { - // Initialize shard slice and fill the data from each parts. + // Initialize block slice and fill the data from each parts. // ReedSolomon.Verify() expects that slice is not nil even if the particular // part needs healing. - enShards[index] = make([]byte, curShardSize) + enBlocks[index] = make([]byte, curBlockSize) if needsSelfHeal[index] { // Skip reading if the part needs healing. continue } - _, e := io.ReadFull(reader, enShards[index]) + _, e := io.ReadFull(reader, enBlocks[index]) if e != nil && e != io.ErrUnexpectedEOF { - enShards[index] = nil + enBlocks[index] = nil } } // Check blocks if they are all zero in length. - if checkBlockSize(enShards) == 0 { + if checkBlockSize(enBlocks) == 0 { err = errors.New("Data likely corrupted, all blocks are zero in length.") return err } - // Verify the shards. - ok, e := xl.ReedSolomon.Verify(enShards) + // Verify the blocks. + ok, e := xl.ReedSolomon.Verify(enBlocks) if e != nil { closeAndRemoveWriters(writers...) return e } - // Verification failed, shards require reconstruction. + // Verification failed, blocks require reconstruction. if !ok { for index, shNeeded := range needsSelfHeal { if shNeeded { // Reconstructs() reconstructs the parts if the array is nil. - enShards[index] = nil + enBlocks[index] = nil } } - e = xl.ReedSolomon.Reconstruct(enShards) + e = xl.ReedSolomon.Reconstruct(enBlocks) if e != nil { closeAndRemoveWriters(writers...) return e } - // Verify reconstructed shards again. - ok, e = xl.ReedSolomon.Verify(enShards) + // Verify reconstructed blocks again. + ok, e = xl.ReedSolomon.Verify(enBlocks) if e != nil { closeAndRemoveWriters(writers...) return e } if !ok { - // Shards cannot be reconstructed, corrupted data. + // Blocks cannot be reconstructed, corrupted data. e = errors.New("Verification failed after reconstruction, data likely corrupted.") closeAndRemoveWriters(writers...) return e @@ -170,7 +170,7 @@ func (xl XL) selfHeal(volume string, path string) error { if !shNeeded { continue } - _, e := writers[index].Write(enShards[index]) + _, e := writers[index].Write(enBlocks[index]) if e != nil { closeAndRemoveWriters(writers...) return e diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index bfbb8d9a7..d23b765de 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -17,7 +17,6 @@ package main import ( - "encoding/json" "errors" "fmt" "io" @@ -44,85 +43,66 @@ func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) { return } -func (xl XL) getMetaFileVersionMap(volume, path string) (diskFileVersionMap map[int]int64) { - metadataFilePath := slashpath.Join(path, metadataFile) - // Set offset to 0 to read entire file. - offset := int64(0) - metadata := make(map[string]string) - - // Allocate disk index format map - do not use maps directly without allocating. - diskFileVersionMap = make(map[int]int64) - - // TODO - all errors should be logged here. - - // Read meta data from all disks - for index, disk := range xl.storageDisks { - diskFileVersionMap[index] = -1 - - metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) - if err != nil { - continue - } else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { - continue - } else if _, ok := metadata["file.version"]; !ok { - diskFileVersionMap[index] = 0 - } - // Convert string to integer. - fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64) - if err != nil { - continue +// Returns slice of disks needed for ReadFile operation: +// - slice returing readable disks. +// - file size +// - error if any. +func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error) { + partsMetadata, errs := xl.getPartsMetadata(volume, path) + highestVersion := int64(0) + versions := make([]int64, len(xl.storageDisks)) + quorumDisks := make([]StorageAPI, len(xl.storageDisks)) + fileSize := int64(0) + for index, metadata := range partsMetadata { + if errs[index] == nil { + if versionStr, ok := metadata["file.version"]; ok { + // Convert string to integer. + version, err := strconv.ParseInt(versionStr, 10, 64) + if err != nil { + // Unexpected, return error. + return nil, 0, err + } + if version > highestVersion { + highestVersion = version + } + versions[index] = version + } else { + versions[index] = 0 + } + } else { + versions[index] = -1 } - diskFileVersionMap[index] = fileVersion } - return diskFileVersionMap -} -type quorumDisk struct { - disk StorageAPI - index int -} - -// getQuorumDisks - get the current quorum disks. -func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) { - diskVersionMap := xl.getMetaFileVersionMap(volume, path) - for diskIndex, formatVersion := range diskVersionMap { - if formatVersion > higherVersion { - higherVersion = formatVersion - quorumDisks = []quorumDisk{{ - disk: xl.storageDisks[diskIndex], - index: diskIndex, - }} - } else if formatVersion == higherVersion { - quorumDisks = append(quorumDisks, quorumDisk{ - disk: xl.storageDisks[diskIndex], - index: diskIndex, - }) + quorumCount := 0 + for index, version := range versions { + if version == highestVersion { + quorumDisks[index] = xl.storageDisks[index] + quorumCount++ + } else { + quorumDisks[index] = nil } } - return -} - -// getFileSize - extract file size from metadata. -func (xl XL) getFileSize(volume, path string, disk StorageAPI) (size int64, err error) { - metadataFilePath := slashpath.Join(path, metadataFile) - // set offset to 0 to read entire file - offset := int64(0) - metadata := make(map[string]string) - - metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) - if err != nil { - return 0, err - } - - if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { - return 0, err + if quorumCount < xl.readQuorum { + return nil, 0, errReadQuorum } - if _, ok := metadata["file.size"]; !ok { - return 0, errors.New("missing 'file.size' in meta data") + for index, disk := range quorumDisks { + if disk == nil { + continue + } + if sizeStr, ok := partsMetadata[index]["file.size"]; ok { + var err error + fileSize, err = strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return nil, 0, err + } + break + } else { + return nil, 0, errors.New("Missing 'file.size' in meta data.") + } } - - return strconv.ParseInt(metadata["file.size"], 10, 64) + return quorumDisks, fileSize, nil } // ReadFile - read file @@ -140,43 +120,23 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) xl.lockNS(volume, path, readLock) defer xl.unlockNS(volume, path, readLock) - // Check read quorum. - quorumDisks, _ := xl.getQuorumDisks(volume, path) - if len(quorumDisks) < xl.readQuorum { - return nil, errReadQuorum - } - - // Get file size. - fileSize, err := xl.getFileSize(volume, path, quorumDisks[0].disk) + quorumDisks, fileSize, err := xl.getReadableDisks(volume, path) if err != nil { return nil, err } - totalBlocks := xl.DataBlocks + xl.ParityBlocks // Total blocks. - readers := make([]io.ReadCloser, len(quorumDisks)) - readFileError := 0 - for _, quorumDisk := range quorumDisks { - erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", quorumDisk.index)) - var erasuredPartReader io.ReadCloser - if erasuredPartReader, err = quorumDisk.disk.ReadFile(volume, erasurePart, offset); err != nil { - // We can safely allow ReadFile errors up to len(quorumDisks) - xl.readQuorum - // otherwise return failure - if readFileError < len(quorumDisks)-xl.readQuorum { - // Set the reader to 'nil' to be able to reconstruct later. - readers[quorumDisk.index] = nil - readFileError++ - continue - } - // Control reaches here we do not have quorum - // anymore. Close all the readers. - for _, reader := range readers { - if reader != nil { - reader.Close() - } - } - return nil, errReadQuorum + readers := make([]io.ReadCloser, len(xl.storageDisks)) + for index, disk := range quorumDisks { + if disk == nil { + continue + } + erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) + // If disk.ReadFile returns error and we don't have read quorum it will be taken care as + // ReedSolomon.Reconstruct() will fail later. + var reader io.ReadCloser + if reader, err = disk.ReadFile(volume, erasurePart, offset); err == nil { + readers[index] = reader } - readers[quorumDisk.index] = erasuredPartReader } // Initialize pipe. @@ -194,19 +154,17 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) } // Calculate the current encoded block size. curEncBlockSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks) - enBlocks := make([][]byte, totalBlocks) + enBlocks := make([][]byte, len(xl.storageDisks)) // Loop through all readers and read. for index, reader := range readers { + // Initialize shard slice and fill the data from each parts. + enBlocks[index] = make([]byte, curEncBlockSize) if reader == nil { - // One of files missing, save it for reconstruction. - enBlocks[index] = nil continue } - // Initialize shard slice and fill the data from each parts. - enBlocks[index] = make([]byte, curEncBlockSize) _, err = io.ReadFull(reader, enBlocks[index]) if err != nil && err != io.ErrUnexpectedEOF { - enBlocks[index] = nil + readers[index] = nil } } @@ -229,6 +187,12 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) // Verification failed, blocks require reconstruction. if !ok { + for index, reader := range readers { + if reader == nil { + // Reconstruct expects missing blocks to be nil. + enBlocks[index] = nil + } + } err = xl.ReedSolomon.Reconstruct(enBlocks) if err != nil { pipeWriter.CloseWithError(err) @@ -264,6 +228,9 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) // Cleanly close all the underlying data readers. for _, reader := range readers { + if reader == nil { + continue + } reader.Close() } }() diff --git a/xl-v1-utils.go b/xl-v1-utils.go new file mode 100644 index 000000000..69d7e987c --- /dev/null +++ b/xl-v1-utils.go @@ -0,0 +1,76 @@ +package main + +import ( + "encoding/json" + "errors" + slashpath "path" + "path/filepath" +) + +// Get parts.json metadata as a map slice. +// Returns error slice indicating the failed metadata reads. +func (xl XL) getPartsMetadata(volume, path string) ([]map[string]string, []error) { + errs := make([]error, len(xl.storageDisks)) + metadataArray := make([]map[string]string, len(xl.storageDisks)) + metadataFilePath := slashpath.Join(path, metadataFile) + for index, disk := range xl.storageDisks { + metadata := make(map[string]string) + offset := int64(0) + metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) + if err != nil { + errs[index] = err + continue + } + defer metadataReader.Close() + + decoder := json.NewDecoder(metadataReader) + if err = decoder.Decode(&metadata); err != nil { + // Unable to parse parts.json, set error. + errs[index] = err + continue + } + metadataArray[index] = metadata + } + return metadataArray, errs +} + +// Writes/Updates `parts.json` for given file. updateParts carries +// index of disks where `parts.json` needs to be updated. +// +// Returns collection of errors, indexed in accordance with input +// updateParts order. +func (xl XL) setPartsMetadata(volume, path string, metadata map[string]string, updateParts []bool) []error { + metadataFilePath := filepath.Join(path, metadataFile) + errs := make([]error, len(xl.storageDisks)) + + for index := range updateParts { + errs[index] = errors.New("metadata not updated") + } + + metadataBytes, err := json.Marshal(metadata) + if err != nil { + for index := range updateParts { + errs[index] = err + } + return errs + } + + for index, shouldUpdate := range updateParts { + if !shouldUpdate { + continue + } + writer, err := xl.storageDisks[index].CreateFile(volume, metadataFilePath) + errs[index] = err + if err != nil { + continue + } + _, err = writer.Write(metadataBytes) + if err != nil { + errs[index] = err + safeCloseAndRemove(writer) + continue + } + writer.Close() + } + return errs +}