From 45b3d3e21f2b7b2a760798b4b87140198bb16313 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Wed, 20 Apr 2016 22:44:30 +0530 Subject: [PATCH] xl: add quorum support for create file --- xl-v1-createfile.go | 84 ++++++++++++++++++++++++++++++++++----------- xl-v1-readfile.go | 7 ++-- 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index d6b34a742..ed5767218 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -51,34 +51,67 @@ func closeAndRemoveWriters(writers ...io.WriteCloser) { // WriteErasure reads predefined blocks, encodes them and writes to // configured storage disks. func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { - var writers = make([]io.WriteCloser, len(xl.storageDisks)) - var sha512Writers = make([]hash.Hash, len(xl.storageDisks)) - var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks)) + xl.lockNS(volume, path, false) + defer xl.unlockNS(volume, path, false) + + // get available quorum for existing file path + _, higherVersion := xl.getQuorumDisks(volume, path) + // increment to have next higher version + higherVersion++ + + quorumDisks := make([]quorumDisk, len(xl.storageDisks)) + writers := make([]io.WriteCloser, len(xl.storageDisks)) + sha512Writers := make([]hash.Hash, len(xl.storageDisks)) + + metadataFilePath := slashpath.Join(path, metadataFile) + metadataWriters := make([]io.WriteCloser, len(xl.storageDisks)) // Save additional erasureMetadata. modTime := time.Now().UTC() - // Initialize storage disks, get all the writers and corresponding - // metadata writers. + createFileError := 0 + maxIndex := 0 for index, disk := range xl.storageDisks { - var err error erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) - writers[index], err = disk.CreateFile(volume, erasurePart) + writer, err := disk.CreateFile(volume, erasurePart) if err != nil { + createFileError++ + + // we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum + // otherwise return failure + if createFileError <= len(xl.storageDisks)-xl.writeQuorum { + continue + } + // Remove previous temp writers for any failure. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } - metadataFilePath := slashpath.Join(path, metadataFile) - metadataWriters[index], err = disk.CreateFile(volume, metadataFilePath) + + // create meta data file + var metadataWriter io.WriteCloser + metadataWriter, err = disk.CreateFile(volume, metadataFilePath) if err != nil { + createFileError++ + + // we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum + // otherwise return failure + if createFileError <= len(xl.storageDisks)-xl.writeQuorum { + continue + } + // Remove previous temp writers for any failure. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } - sha512Writers[index] = fastSha512.New() + + writers[maxIndex] = writer + metadataWriters[maxIndex] = metadataWriter + sha512Writers[maxIndex] = fastSha512.New() + quorumDisks[maxIndex] = quorumDisk{disk, index} + maxIndex++ } // Allocate 4MiB block size buffer for reading. @@ -118,16 +151,19 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { reader.CloseWithError(err) return } - // Loop through and write encoded data to all the disks. - for index, encodedData := range blocks { - _, err = writers[index].Write(encodedData) + + // Loop through and write encoded data to quorum disks. + for i := 0; i < maxIndex; i++ { + encodedData := blocks[quorumDisks[i].index] + + _, err = writers[i].Write(encodedData) if err != nil { // Remove all temp writers upon error. xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) return } - sha512Writers[index].Write(encodedData) + sha512Writers[i].Write(encodedData) } // Update total written. totalSize += int64(n) @@ -141,15 +177,23 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { metadata["format.minor"] = "0" metadata["format.patch"] = "0" metadata["file.size"] = strconv.FormatInt(totalSize, 10) + if len(xl.storageDisks) > len(writers) { + // save file.version only if we wrote to less disks than all disks + metadata["file.version"] = strconv.FormatInt(higherVersion, 10) + } metadata["file.modTime"] = modTime.Format(timeFormatAMZ) metadata["file.xl.blockSize"] = strconv.Itoa(erasureBlockSize) metadata["file.xl.dataBlocks"] = strconv.Itoa(xl.DataBlocks) metadata["file.xl.parityBlocks"] = strconv.Itoa(xl.ParityBlocks) // Write all the metadata. - for index, metadataWriter := range metadataWriters { + // below case is not handled here + // Case: when storageDisks is 16 and write quorumDisks is 13, + // meta data write failure up to 2 can be considered. + // currently we fail for any meta data writes + for i := 0; i < maxIndex; i++ { // Save sha512 checksum of each encoded blocks. - metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[index].Sum(nil)) + metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[i].Sum(nil)) // Marshal metadata into json strings. metadataBytes, err := json.Marshal(metadata) @@ -161,7 +205,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } // Write metadata to disk. - _, err = metadataWriter.Write(metadataBytes) + _, err = metadataWriters[i].Write(metadataBytes) if err != nil { xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) reader.CloseWithError(err) @@ -170,10 +214,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } // Close all writers and metadata writers in routines. - for index := range xl.storageDisks { + for i := 0; i < maxIndex; i++ { // Safely wrote, now rename to its actual location. - writers[index].Close() - metadataWriters[index].Close() + writers[i].Close() + metadataWriters[i].Close() } // Close the pipe reader and return. diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index c397cdb8a..bfbb8d9a7 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -82,10 +82,9 @@ type quorumDisk struct { index int } -// getReadFileQuorumDisks - get the current quorum disks. -func (xl XL) getReadFileQuorumDisks(volume, path string) (quorumDisks []quorumDisk) { +// getQuorumDisks - get the current quorum disks. +func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) { diskVersionMap := xl.getMetaFileVersionMap(volume, path) - higherVersion := int64(0) for diskIndex, formatVersion := range diskVersionMap { if formatVersion > higherVersion { higherVersion = formatVersion @@ -142,7 +141,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) defer xl.unlockNS(volume, path, readLock) // Check read quorum. - quorumDisks := xl.getReadFileQuorumDisks(volume, path) + quorumDisks, _ := xl.getQuorumDisks(volume, path) if len(quorumDisks) < xl.readQuorum { return nil, errReadQuorum }