From b090c7112ee431f1e37e6602b9b4fa94aa0f5745 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Fri, 15 Jul 2016 03:29:01 +0530 Subject: [PATCH] Refactor of xl.PutObjectPart and erasureCreateFile. (#2193) * XL: Refactor of xl.PutObjectPart and erasureCreateFile. * GetCheckSum and AddCheckSum methods for xlMetaV1 * Simple unit test case for erasureCreateFile() --- erasure-createfile.go | 60 +++++++++++----------------------- erasure-readfile.go | 27 +++++++++++++--- erasure-readfile_test.go | 3 +- erasure_test.go | 70 ++++++++++++++++++++++++++++++++++++++++ xl-v1-healing.go | 9 +++--- xl-v1-metadata.go | 30 ++++++++++++++--- xl-v1-multipart.go | 57 +++++++++++--------------------- xl-v1-object.go | 37 ++++----------------- 8 files changed, 168 insertions(+), 125 deletions(-) diff --git a/erasure-createfile.go b/erasure-createfile.go index 78cc581bf..8f0319b2d 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -28,22 +28,20 @@ import ( // erasureCreateFile - writes an entire stream by erasure coding to // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. -func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo, writeQuorum int) (newEInfos []erasureInfo, size int64, err error) { - // Just pick one eInfo. - eInfo := pickValidErasureInfo(eInfos) - +func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (size int64, checkSums []string, err error) { // Allocated blockSized buffer for reading. - buf := make([]byte, eInfo.BlockSize) + buf := make([]byte, blockSize) + hashWriters := newHashWriters(len(disks)) // Read until io.EOF, erasure codes data and writes to all disks. for { var blocks [][]byte - n, rErr := io.ReadFull(data, buf) + n, rErr := io.ReadFull(reader, buf) // FIXME: this is a bug in Golang, n == 0 and err == // io.ErrUnexpectedEOF for io.ReadFull function. if n == 0 && rErr == io.ErrUnexpectedEOF { - return nil, 0, rErr + return 0, nil, rErr } if rErr == io.EOF { // We have reached EOF on the first byte read, io.Reader @@ -51,56 +49,38 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName // data. Will create a 0byte file instead. if size == 0 { blocks = make([][]byte, len(disks)) - rErr = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum) + rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum) if rErr != nil { - return nil, 0, rErr + return 0, nil, rErr } } // else we have reached EOF after few reads, no need to // add an additional 0bytes at the end. break } if rErr != nil && rErr != io.ErrUnexpectedEOF { - return nil, 0, rErr + return 0, nil, rErr } if n > 0 { // Returns encoded blocks. var enErr error - blocks, enErr = encodeData(buf[0:n], eInfo.DataBlocks, eInfo.ParityBlocks) + blocks, enErr = encodeData(buf[0:n], dataBlocks, parityBlocks) if enErr != nil { - return nil, 0, enErr + return 0, nil, enErr } // Write to all disks. - if err = appendFile(disks, volume, path, blocks, eInfo.Distribution, hashWriters, writeQuorum); err != nil { - return nil, 0, err + if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil { + return 0, nil, err } size += int64(n) } } - // Save the checksums. - checkSums := make([]checkSumInfo, len(disks)) - for index := range disks { - blockIndex := eInfo.Distribution[index] - 1 - checkSums[blockIndex] = checkSumInfo{ - Name: partName, - Algorithm: "blake2b", - Hash: hex.EncodeToString(hashWriters[blockIndex].Sum(nil)), - } + checkSums = make([]string, len(disks)) + for i := range checkSums { + checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil)) } - - // Erasure info update for checksum for each disks. - newEInfos = make([]erasureInfo, len(disks)) - for index, eInfo := range eInfos { - if eInfo.IsValid() { - blockIndex := eInfo.Distribution[index] - 1 - newEInfos[index] = eInfo - newEInfos[index].Checksum = append(newEInfos[index].Checksum, checkSums[blockIndex]) - } - } - - // Return newEInfos. - return newEInfos, size, nil + return size, checkSums, nil } // encodeData - encodes incoming data buffer into @@ -128,7 +108,7 @@ func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, erro } // appendFile - append data buffer at path. -func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash, writeQuorum int) (err error) { +func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hashWriters []hash.Hash, writeQuorum int) (err error) { var wg = &sync.WaitGroup{} var wErrs = make([]error, len(disks)) // Write encoded data to quorum disks in parallel. @@ -140,16 +120,14 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, dist // Write encoded data in routine. go func(index int, disk StorageAPI) { defer wg.Done() - // Pick the block from the distribution. - blockIndex := distribution[index] - 1 - wErr := disk.AppendFile(volume, path, enBlocks[blockIndex]) + wErr := disk.AppendFile(volume, path, enBlocks[index]) if wErr != nil { wErrs[index] = wErr return } // Calculate hash for each blocks. - hashWriters[blockIndex].Write(enBlocks[blockIndex]) + hashWriters[index].Write(enBlocks[index]) // Successfully wrote. wErrs[index] = nil diff --git a/erasure-readfile.go b/erasure-readfile.go index 7cfe02880..e79dc0149 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -65,18 +65,36 @@ func isSuccessDataBlocks(enBlocks [][]byte, dataBlocks int) bool { return successDataBlocksCount >= dataBlocks } +// Return ordered partsMetadata depeinding on distribution. +func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) { + orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata)) + for index := range partsMetadata { + blockIndex := distribution[index] + orderedPartsMetadata[blockIndex-1] = partsMetadata[index] + } + return orderedPartsMetadata +} + // getOrderedDisks - get ordered disks from erasure distribution. // returns ordered slice of disks from their actual distribution. -func getOrderedDisks(distribution []int, disks []StorageAPI, blockCheckSums []checkSumInfo) (orderedDisks []StorageAPI, orderedBlockCheckSums []checkSumInfo) { +func getOrderedDisks(distribution []int, disks []StorageAPI) (orderedDisks []StorageAPI) { orderedDisks = make([]StorageAPI, len(disks)) - orderedBlockCheckSums = make([]checkSumInfo, len(disks)) // From disks gets ordered disks. for index := range disks { blockIndex := distribution[index] orderedDisks[blockIndex-1] = disks[index] + } + return orderedDisks +} + +// Return ordered CheckSums depending on the distribution. +func getOrderedCheckSums(distribution []int, blockCheckSums []checkSumInfo) (orderedBlockCheckSums []checkSumInfo) { + orderedBlockCheckSums = make([]checkSumInfo, len(blockCheckSums)) + for index := range blockCheckSums { + blockIndex := distribution[index] orderedBlockCheckSums[blockIndex-1] = blockCheckSums[index] } - return orderedDisks, orderedBlockCheckSums + return orderedBlockCheckSums } // Return readable disks slice from which we can read parallelly. @@ -188,7 +206,8 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // []orderedDisks will have first eInfo.DataBlocks disks as data // disks and rest will be parity. - orderedDisks, orderedBlockCheckSums := getOrderedDisks(eInfo.Distribution, disks, blockCheckSums) + orderedDisks := getOrderedDisks(eInfo.Distribution, disks) + orderedBlockCheckSums := getOrderedCheckSums(eInfo.Distribution, blockCheckSums) // bitRotVerify verifies if the file on a particular disk doesn't have bitrot // by verifying the hash of the contents of the file. diff --git a/erasure-readfile_test.go b/erasure-readfile_test.go index f40673654..5fc3bbe6d 100644 --- a/erasure-readfile_test.go +++ b/erasure-readfile_test.go @@ -109,9 +109,8 @@ func testGetReadDisks(t *testing.T, xl xlObjects) { // actual distribution. func testGetOrderedDisks(t *testing.T, xl xlObjects) { disks := xl.storageDisks - blockCheckSums := make([]checkSumInfo, len(disks)) distribution := []int{16, 14, 12, 10, 8, 6, 4, 2, 1, 3, 5, 7, 9, 11, 13, 15} - orderedDisks, _ := getOrderedDisks(distribution, disks, blockCheckSums) + orderedDisks := getOrderedDisks(distribution, disks) // From the "distribution" above you can notice that: // 1st data block is in the 9th disk (i.e distribution index 8) // 2nd data block is in the 8th disk (i.e distribution index 7) and so on. diff --git a/erasure_test.go b/erasure_test.go index 14a782c4a..f0a0384f5 100644 --- a/erasure_test.go +++ b/erasure_test.go @@ -18,6 +18,9 @@ package main import ( "bytes" + "crypto/rand" + "io/ioutil" + "os" "testing" ) @@ -183,3 +186,70 @@ func TestErasureDecode(t *testing.T) { } } } + +// Simulates a faulty disk for AppendFile() +type AppendDiskDown struct { + *posix +} + +func (a AppendDiskDown) AppendFile(volume string, path string, buf []byte) error { + return errFaultyDisk +} + +// Test erasureCreateFile() +// TODO: +// * check when more disks are down. +// * verify written content by using erasureReadFile. +func TestErasureCreateFile(t *testing.T) { + // Initialize environment needed for the test. + dataBlocks := 7 + parityBlocks := 7 + blockSize := int64(blockSizeV1) + diskPaths := make([]string, dataBlocks+parityBlocks) + disks := make([]StorageAPI, len(diskPaths)) + + for i := range diskPaths { + var err error + diskPaths[i], err = ioutil.TempDir(os.TempDir(), "minio-") + if err != nil { + t.Fatal("Unable to create tmp dir", err) + } + defer removeAll(diskPaths[i]) + disks[i], err = newPosix(diskPaths[i]) + if err != nil { + t.Fatal(err) + } + err = disks[i].MakeVol("testbucket") + if err != nil { + t.Fatal(err) + } + } + + // Prepare a slice of 1MB with random data. + data := make([]byte, 1*1024*1024) + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + // Test when all disks are up. + size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) + if err != nil { + t.Fatal(err) + } + if size != int64(len(data)) { + t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data)) + } + + // Two disks down. + disks[4] = AppendDiskDown{disks[4].(*posix)} + disks[5] = AppendDiskDown{disks[5].(*posix)} + + // Test when two disks are down. + size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) + if err != nil { + t.Fatal(err) + } + if size != int64(len(data)) { + t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data)) + } +} diff --git a/xl-v1-healing.go b/xl-v1-healing.go index f0deda2c0..332d45db2 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -131,10 +131,9 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) { // Returns slice of online disks needed. // - slice returing readable disks. -// - xlMetaV1 -// - bool value indicating if healing is needed. -func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) { - onlineDisks = make([]StorageAPI, len(xl.storageDisks)) +// - modTime of the Object +func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (onlineDisks []StorageAPI, modTime time.Time) { + onlineDisks = make([]StorageAPI, len(disks)) // List all the file commit ids from parts metadata. modTimes := listObjectModtimes(partsMetadata, errs) @@ -145,7 +144,7 @@ func (xl xlObjects) listOnlineDisks(partsMetadata []xlMetaV1, errs []error) (onl // Create a new online disks slice, which have common uuid. for index, t := range modTimes { if t == modTime { - onlineDisks[index] = xl.storageDisks[index] + onlineDisks[index] = disks[index] } else { onlineDisks[index] = nil } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 9ab490f7f..8717e5b3f 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -163,6 +163,27 @@ func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag strin sort.Sort(byObjectPartNumber(m.Parts)) } +// AddCheckSum - add checksum of a part. +func (m *xlMetaV1) AddCheckSum(partName, algorithm, checkSum string) { + for i, sum := range m.Erasure.Checksum { + if sum.Name == partName { + m.Erasure.Checksum[i] = checkSumInfo{partName, "blake2b", checkSum} + return + } + } + m.Erasure.Checksum = append(m.Erasure.Checksum, checkSumInfo{partName, "blake2b", checkSum}) +} + +// GetCheckSum - get checksum of a part. +func (m *xlMetaV1) GetCheckSum(partName string) (checkSum, algorithm string, err error) { + for _, sum := range m.Erasure.Checksum { + if sum.Name == partName { + return sum.Hash, sum.Algorithm, nil + } + } + return "", "", errUnexpected +} + // ObjectToPartOffset - translate offset of an object to offset of its individual part. func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset int64, err error) { if offset == 0 { @@ -187,10 +208,11 @@ func (m xlMetaV1) ObjectToPartOffset(offset int64) (partIndex int, partOffset in // pickValidXLMeta - picks one valid xlMeta content and returns from a // slice of xlmeta content. If no value is found this function panics // and dies. -func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 { - for _, xlMeta := range xlMetas { - if xlMeta.IsValid() { - return xlMeta +func pickValidXLMeta(metaArr []xlMetaV1, modTime time.Time) xlMetaV1 { + // Pick latest valid metadata. + for _, meta := range metaArr { + if meta.IsValid() && meta.Stat.ModTime == modTime { + return meta } } panic("Unable to look for valid XL metadata content") diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index b5497e850..616b8d25c 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -354,10 +354,13 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s nsMutex.RUnlock(minioMetaBucket, uploadIDPath) // List all online disks. - onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs) + onlineDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. - xlMeta := pickValidXLMeta(partsMetadata) + xlMeta := pickValidXLMeta(partsMetadata, modTime) + + onlineDisks = getOrderedDisks(xlMeta.Erasure.Distribution, onlineDisks) + partsMetadata = getOrderedPartsMetadata(xlMeta.Erasure.Distribution, partsMetadata) // Need a unique name for the part being written in minioMetaBucket to // accommodate concurrent PutObjectPart requests @@ -379,17 +382,8 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Construct a tee reader for md5sum. teeReader := io.TeeReader(data, md5Writer) - // Collect all the previous erasure infos across the disk. - var eInfos []erasureInfo - for index := range onlineDisks { - eInfos = append(eInfos, partsMetadata[index].Erasure) - } - // Erasure code data and write across all disks. - newEInfos, sizeWritten, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, partSuffix, teeReader, eInfos, xl.writeQuorum) - if err != nil { - return "", toObjectErr(err, minioMetaBucket, tmpPartPath) - } + sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum) // For size == -1, perhaps client is sending in chunked encoding // set the size as size that was actually written. @@ -421,7 +415,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s nsMutex.Lock(minioMetaBucket, uploadIDPath) defer nsMutex.Unlock(minioMetaBucket, uploadIDPath) - // Validates if upload ID exists again. + // Validate again if upload ID still exists. if !xl.isUploadIDExists(bucket, object, uploadID) { return "", InvalidUploadID{UploadID: uploadID} } @@ -433,34 +427,17 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s return "", toObjectErr(err, minioMetaBucket, partPath) } - // Read metadata (again) associated with the object from all disks. + // Read metadata again because it might be updated with parallel upload of another part. partsMetadata, errs = readAllXLMetadata(onlineDisks, minioMetaBucket, uploadIDPath) if !isDiskQuorum(errs, xl.writeQuorum) { return "", toObjectErr(errXLWriteQuorum, bucket, object) } - var updatedEInfos []erasureInfo - for index := range partsMetadata { - updatedEInfos = append(updatedEInfos, partsMetadata[index].Erasure) - } - - for index, eInfo := range newEInfos { - if eInfo.IsValid() { - // Use a map to find union of checksums of parts that - // we concurrently written and committed before this - // part. N B For a different, concurrent upload of the - // same part, the last written content remains. - finalChecksums := unionChecksumInfos(newEInfos[index].Checksum, updatedEInfos[index].Checksum, partSuffix) - updatedEInfos[index] = eInfo - updatedEInfos[index].Checksum = finalChecksums - } - } + // Get current highest version based on re-read partsMetadata. + onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. - xlMeta = pickValidXLMeta(partsMetadata) - - // Get current highest version based on re-read partsMetadata. - onlineDisks, _ = xl.listOnlineDisks(partsMetadata, errs) + xlMeta = pickValidXLMeta(partsMetadata, modTime) // Once part is successfully committed, proceed with updating XL metadata. xlMeta.Stat.ModTime = time.Now().UTC() @@ -468,10 +445,12 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s // Add the current part. xlMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) - // Update `xl.json` content for each disks. - for index := range partsMetadata { + for index, disk := range onlineDisks { + if disk == nil { + continue + } partsMetadata[index].Parts = xlMeta.Parts - partsMetadata[index].Erasure = updatedEInfos[index] + partsMetadata[index].AddCheckSum(partSuffix, "blake2b", checkSums[index]) } // Write all the checksum metadata. @@ -630,11 +609,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(errXLWriteQuorum, bucket, object) } + _, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) + // Calculate full object size. var objectSize int64 // Pick one from the first valid metadata. - xlMeta := pickValidXLMeta(partsMetadata) + xlMeta := pickValidXLMeta(partsMetadata, modTime) // Save current xl meta for validation. var currentXLMeta = xlMeta diff --git a/xl-v1-object.go b/xl-v1-object.go index fb0698a83..ff7660fa1 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -76,7 +76,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } // List all online disks. - onlineDisks, modTime := xl.listOnlineDisks(metaArr, errs) + onlineDisks, modTime := listOnlineDisks(xl.storageDisks, metaArr, errs) // Pick latest valid metadata. var xlMeta xlMetaV1 @@ -381,28 +381,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix) tempObj := uniqueID - nsMutex.RLock(bucket, object) - // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) - nsMutex.RUnlock(bucket, object) - - // Do we have write quroum?. - if !isDiskQuorum(errs, xl.writeQuorum) { - return "", toObjectErr(errXLWriteQuorum, bucket, object) - } - - // errFileNotFound is handled specially since it's OK for the object to - // not exists in the namespace yet. - if errCount, reducedErr := reduceErrs(errs); reducedErr != nil && reducedErr != errFileNotFound { - if errCount < xl.writeQuorum { - return "", toObjectErr(errXLWriteQuorum, bucket, object) - } - return "", toObjectErr(reducedErr, bucket, object) - } - - // List all online disks. - onlineDisks, _ := xl.listOnlineDisks(partsMetadata, errs) - var mw io.Writer // Initialize md5 writer. md5Writer := md5.New() @@ -447,14 +425,10 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Initialize xl meta. xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) - // Collect all the previous erasure infos across the disk. - var eInfos []erasureInfo - for range onlineDisks { - eInfos = append(eInfos, xlMeta.Erasure) - } + onlineDisks := getOrderedDisks(xlMeta.Erasure.Distribution, xl.storageDisks) - // Erasure code and write across all disks. - newEInfos, sizeWritten, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, "part.1", teeReader, eInfos, xl.writeQuorum) + // Erasure code data and write across all disks. + sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xl.writeQuorum) if err != nil { return "", toObjectErr(err, minioMetaBucket, tempErasureObj) } @@ -531,10 +505,11 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Add the final part. xlMeta.AddObjectPart(1, "part.1", newMD5Hex, xlMeta.Stat.Size) + partsMetadata := make([]xlMetaV1, len(xl.storageDisks)) // Update `xl.json` content on each disks. for index := range partsMetadata { partsMetadata[index] = xlMeta - partsMetadata[index].Erasure = newEInfos[index] + partsMetadata[index].AddCheckSum("part.1", "blake2b", checkSums[index]) } // Write unique `xl.json` for each disk.