From 18728a0b595153e6c391d87f57ec693de4cdf167 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Wed, 20 Jul 2016 14:00:30 +0530 Subject: [PATCH] XL/erasure-read: refactor erasure read and add tests (#2232) --- erasure-readfile.go | 70 ++++++++++++++++++++----------------- erasure-readfile_test.go | 74 +++++++++++++++++++++++++++++++++++++--- erasure-utils.go | 36 ++++++++----------- erasure-utils_test.go | 53 ++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 58 deletions(-) create mode 100644 erasure-utils_test.go diff --git a/erasure-readfile.go b/erasure-readfile.go index fce4d4f8e..05e389baf 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -166,6 +166,11 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s return 0, errUnexpected } + // Can't request more data than what is available. + if offset+length > totalLength { + return 0, errUnexpected + } + // bitRotVerify verifies if the file on a particular disk doesn't have bitrot // by verifying the hash of the contents of the file. bitRotVerify := func() func(diskIndex int) bool { @@ -188,39 +193,33 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // Total bytes written to writer bytesWritten := int64(0) - // chunkSize is roughly BlockSize/DataBlocks. - // chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. - // So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by - // DataBlocks. The extra space will have 0-padding. - chunkSize := getEncodedBlockLen(blockSize, dataBlocks) + // chunkSize is the amount of data that needs to be read from each disk at a time. + chunkSize := getChunkSize(blockSize, dataBlocks) - // Get start and end block, also bytes to be skipped based on the input offset. - startBlock, endBlock, bytesToSkip := getBlockInfo(offset, totalLength, blockSize) + startBlock := offset / blockSize + endBlock := (offset + length) / blockSize + + // curChunkSize = chunk size for the current block in the for loop below. + // curBlockSize = block size for the current block in the for loop below. + // curChunkSize and curBlockSize can change for the last block if totalLength%blockSize != 0 + curChunkSize := chunkSize + curBlockSize := blockSize // For each block, read chunk from each disk. If we are able to read all the data disks then we don't // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. - for block := startBlock; bytesWritten < length; block++ { + for block := startBlock; block <= endBlock; block++ { // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. enBlocks := make([][]byte, len(disks)) - // enBlocks data can have 0-padding hence we need to figure the exact number - // of bytes we want to read from enBlocks. - blockSize := blockSize - - // curChunkSize is chunkSize until end block. - curChunkSize := chunkSize - - // We have endBlock, verify if we need to have padding. - if block == endBlock && (totalLength%blockSize != 0) { - // If this is the last block and size of the block is < BlockSize. - curChunkSize = getEncodedBlockLen(totalLength%blockSize, dataBlocks) - - // For the last block, the block size can be less than BlockSize. - blockSize = totalLength % blockSize + if ((offset + bytesWritten) / blockSize) == (totalLength / blockSize) { + // This is the last block for which curBlockSize and curChunkSize can change. + // For ex. if totalLength is 15M and blockSize is 10MB, curBlockSize for + // the last block should be 5MB. + curBlockSize = totalLength % blockSize + curChunkSize = getChunkSize(curBlockSize, dataBlocks) } - // Block offset. // NOTE: That for the offset calculation we have to use chunkSize and // not curChunkSize. If we use curChunkSize for offset calculation // then it can result in wrong offset for the last block. @@ -261,30 +260,37 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s } } - var outSize, outOffset int64 + // Offset in enBlocks from where data should be read from. + enBlocksOffset := int64(0) - // Total data to be read. - outSize = blockSize + // Total data to be read from enBlocks. + enBlocksLength := curBlockSize - // If this is start block, skip unwanted bytes. + // If this is the start block then enBlocksOffset might not be 0. if block == startBlock { - outOffset = bytesToSkip - outSize -= bytesToSkip + enBlocksOffset = offset % blockSize + enBlocksLength -= enBlocksOffset } - if length-bytesWritten < outSize { + remaining := length - bytesWritten + if remaining < enBlocksLength { // We should not send more data than what was requested. - outSize = length - bytesWritten + enBlocksLength = remaining } // Write data blocks. - n, err := writeDataBlocks(writer, enBlocks, dataBlocks, outOffset, outSize) + n, err := writeDataBlocks(writer, enBlocks, dataBlocks, enBlocksOffset, enBlocksLength) if err != nil { return bytesWritten, err } // Update total bytes written. bytesWritten += n + + if bytesWritten == length { + // Done writing all the requested data. + break + } } // Success. diff --git a/erasure-readfile_test.go b/erasure-readfile_test.go index 875f32f6c..0b77f465b 100644 --- a/erasure-readfile_test.go +++ b/erasure-readfile_test.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2015, 2016 Minio, Inc. + * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,9 @@ package main import ( "bytes" - "crypto/rand" + "math/rand" "testing" + "time" ) import "reflect" @@ -308,7 +309,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) { disks := setup.disks - // Prepare a slice of 1MB with random data. + // Prepare a slice of 5MB with random data. data := make([]byte, 5*1024*1024) length := int64(len(data)) _, err = rand.Read(data) @@ -330,6 +331,8 @@ func TestErasureReadFileOffsetLength(t *testing.T) { }{ // Full file. {0, length}, + // Read nothing. + {length, 0}, // 2nd block. {blockSize, blockSize}, // Test cases for random offsets and lengths. @@ -338,7 +341,7 @@ func TestErasureReadFileOffsetLength(t *testing.T) { {blockSize + 1, blockSize - 1}, {blockSize + 1, blockSize}, {blockSize + 1, blockSize + 1}, - {blockSize*2 - 1, blockSize*3 + 1}, + {blockSize*2 - 1, blockSize + 1}, {length - 1, 1}, {length - blockSize, blockSize}, {length - blockSize - 1, blockSize}, @@ -359,3 +362,66 @@ func TestErasureReadFileOffsetLength(t *testing.T) { } } } + +// Test erasureReadFile with random offset and lengths. +// This test is t.Skip()ed as it a long time to run, hence should be run +// explicitly after commenting out t.Skip() +func TestErasureReadFileRandomOffsetLength(t *testing.T) { + // Comment the following line to run this test. + t.SkipNow() + // Initialize environment needed for the test. + dataBlocks := 7 + parityBlocks := 7 + blockSize := int64(1 * 1024 * 1024) + setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize) + if err != nil { + t.Error(err) + return + } + defer setup.Remove() + + disks := setup.disks + + // Prepare a slice of 5MB with random data. + data := make([]byte, 5*1024*1024) + length := int64(len(data)) + _, err = rand.Read(data) + if err != nil { + t.Fatal(err) + } + + // 10000 iterations with random offsets and lengths. + iterations := 10000 + + // Create a test file to read from. + size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1) + if err != nil { + t.Fatal(err) + } + if size != length { + t.Errorf("erasureCreateFile returned %d, expected %d", size, length) + } + + // To generate random offset/length. + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + buf := &bytes.Buffer{} + + // Verify erasureReadFile() for random offsets and lengths. + for i := 0; i < iterations; i++ { + offset := r.Int63n(length) + readLen := r.Int63n(length - offset) + + expected := data[offset : offset+readLen] + + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums) + if err != nil { + t.Fatal(err, offset, readLen) + } + got := buf.Bytes() + if !bytes.Equal(expected, got) { + t.Fatalf("read data is different from what was expected, offset=%d length=%d", offset, readLen) + } + buf.Reset() + } +} diff --git a/erasure-utils.go b/erasure-utils.go index ca7da6eeb..c3641313d 100644 --- a/erasure-utils.go +++ b/erasure-utils.go @@ -73,9 +73,9 @@ func getDataBlockLen(enBlocks [][]byte, dataBlocks int) int { // Writes all the data blocks from encoded blocks until requested // outSize length. Provides a way to skip bytes until the offset. -func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset int64, outSize int64) (int64, error) { +func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, offset int64, length int64) (int64, error) { // Offset and out size cannot be negative. - if outOffset < 0 || outSize < 0 { + if offset < 0 || length < 0 { return 0, errUnexpected } @@ -85,12 +85,12 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset } // Do we have enough data? - if int64(getDataBlockLen(enBlocks, dataBlocks)) < outSize { + if int64(getDataBlockLen(enBlocks, dataBlocks)) < length { return 0, reedsolomon.ErrShortData } // Counter to decrement total left to write. - write := outSize + write := length // Counter to increment total written. totalWritten := int64(0) @@ -98,17 +98,17 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset // Write all data blocks to dst. for _, block := range enBlocks[:dataBlocks] { // Skip blocks until we have reached our offset. - if outOffset >= int64(len(block)) { + if offset >= int64(len(block)) { // Decrement offset. - outOffset -= int64(len(block)) + offset -= int64(len(block)) continue } else { // Skip until offset. - block = block[outOffset:] + block = block[offset:] // Reset the offset for next iteration to read everything // from subsequent blocks. - outOffset = 0 + offset = 0 } // We have written all the blocks, write the last remaining block. if write < int64(len(block)) { @@ -136,20 +136,12 @@ func writeDataBlocks(dst io.Writer, enBlocks [][]byte, dataBlocks int, outOffset return totalWritten, nil } -// getBlockInfo - find start/end block and bytes to skip for given offset, length and block size. -func getBlockInfo(offset, length, blockSize int64) (startBlock, endBlock, bytesToSkip int64) { - // Calculate start block for given offset and how many bytes to skip to get the offset. - startBlock = offset / blockSize - bytesToSkip = offset % blockSize - endBlock = length / blockSize - return -} - -// calculate the blockSize based on input length and total number of -// data blocks. -func getEncodedBlockLen(inputLen int64, dataBlocks int) (curEncBlockSize int64) { - curEncBlockSize = (inputLen + int64(dataBlocks) - 1) / int64(dataBlocks) - return curEncBlockSize +// chunkSize is roughly BlockSize/DataBlocks. +// chunkSize is calculated such that chunkSize*DataBlocks accommodates BlockSize bytes. +// So chunkSize*DataBlocks can be slightly larger than BlockSize if BlockSize is not divisible by +// DataBlocks. The extra space will have 0-padding. +func getChunkSize(blockSize int64, dataBlocks int) int64 { + return (blockSize + int64(dataBlocks) - 1) / int64(dataBlocks) } // copyN - copies from disk, volume, path to input writer until length diff --git a/erasure-utils_test.go b/erasure-utils_test.go new file mode 100644 index 000000000..d8eef0eff --- /dev/null +++ b/erasure-utils_test.go @@ -0,0 +1,53 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "testing" + +// Test for getChunkSize() +func TestGetChunkSize(t *testing.T) { + // Refer to comments on getChunkSize() for details. + testCases := []struct { + blockSize int64 + dataBlocks int + chunkSize int64 + }{ + { + 10, + 10, + 1, + }, + { + 10, + 11, + 1, + }, + { + 10, + 9, + 2, + }, + } + // Verify getChunkSize() for the test cases. + for i, test := range testCases { + expected := test.chunkSize + got := getChunkSize(test.blockSize, test.dataBlocks) + if expected != got { + t.Errorf("Test %d : expected=%d got=%d", i+1, expected, got) + } + } +}