diff --git a/erasure-readfile.go b/erasure-readfile.go index 05e389baf..ef0507794 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -17,13 +17,13 @@ package main import ( - "bytes" "encoding/hex" "errors" "io" "sync" "github.com/klauspost/reedsolomon" + "github.com/minio/minio/pkg/bpool" ) // isSuccessDecodeBlocks - do we have all the blocks to be @@ -112,7 +112,7 @@ func getReadDisks(orderedDisks []StorageAPI, index int, dataBlocks int) (readDis } // parallelRead - reads chunks in parallel from the disks specified in []readDisks. -func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []StorageAPI, enBlocks [][]byte, blockOffset int64, curChunkSize int64, bitRotVerify func(diskIndex int) bool) { +func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []StorageAPI, enBlocks [][]byte, blockOffset int64, curChunkSize int64, bitRotVerify func(diskIndex int) bool, pool *bpool.BytePool) { // WaitGroup to synchronise the read go-routines. wg := &sync.WaitGroup{} @@ -133,21 +133,20 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St return } - // Chunk writer. - chunkWriter := bytes.NewBuffer(make([]byte, 0, curChunkSize)) - - // CopyN - copies until current chunk size. - err := copyN(chunkWriter, readDisks[index], volume, path, blockOffset, curChunkSize) + buf, err := pool.Get() if err != nil { - // So that we don't read from this disk for the next block. + errorIf(err, "unable to get buffer from byte pool") orderedDisks[index] = nil return } + buf = buf[:curChunkSize] - // Copy the read blocks. - enBlocks[index] = chunkWriter.Bytes() - - // Successfully read. + _, err = readDisks[index].ReadFile(volume, path, blockOffset, buf) + if err != nil { + orderedDisks[index] = nil + return + } + enBlocks[index] = buf }(index) } @@ -160,7 +159,7 @@ func parallelRead(volume, path string, readDisks []StorageAPI, orderedDisks []St // are decoded into a data block. Data block is trimmed for given offset and length, // then written to given writer. This function also supports bit-rot detection by // verifying checksum of individual block's checksum. -func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string) (int64, error) { +func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path string, offset int64, length int64, totalLength int64, blockSize int64, dataBlocks int, parityBlocks int, checkSums []string, pool *bpool.BytePool) (int64, error) { // Offset and length cannot be negative. if offset < 0 || length < 0 { return 0, errUnexpected @@ -171,6 +170,9 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s return 0, errUnexpected } + // chunkSize is the amount of data that needs to be read from each disk at a time. + chunkSize := getChunkSize(blockSize, dataBlocks) + // bitRotVerify verifies if the file on a particular disk doesn't have bitrot // by verifying the hash of the contents of the file. bitRotVerify := func() func(diskIndex int) bool { @@ -193,9 +195,6 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // Total bytes written to writer bytesWritten := int64(0) - // chunkSize is the amount of data that needs to be read from each disk at a time. - chunkSize := getChunkSize(blockSize, dataBlocks) - startBlock := offset / blockSize endBlock := (offset + length) / blockSize @@ -209,6 +208,10 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s // need to read parity disks. If one of the data disk is missing we need to read DataBlocks+1 number // of disks. Once read, we Reconstruct() missing data if needed and write it to the given writer. for block := startBlock; block <= endBlock; block++ { + // Mark all buffers as unused at the start of the loop so that the buffers + // can be reused. + pool.Reset() + // Each element of enBlocks holds curChunkSize'd amount of data read from its corresponding disk. enBlocks := make([][]byte, len(disks)) @@ -239,7 +242,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s return bytesWritten, err } // Issue a parallel read across the disks specified in readDisks. - parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, bitRotVerify) + parallelRead(volume, path, readDisks, disks, enBlocks, blockOffset, curChunkSize, bitRotVerify, pool) if isSuccessDecodeBlocks(enBlocks, dataBlocks) { // If enough blocks are available to do rs.Reconstruct() break diff --git a/erasure-readfile_test.go b/erasure-readfile_test.go index 0b77f465b..662972e12 100644 --- a/erasure-readfile_test.go +++ b/erasure-readfile_test.go @@ -21,6 +21,8 @@ import ( "math/rand" "testing" "time" + + "github.com/minio/minio/pkg/bpool" ) import "reflect" @@ -249,8 +251,13 @@ func TestErasureReadFileDiskFail(t *testing.T) { t.Errorf("erasureCreateFile returned %d, expected %d", size, length) } + // create byte pool which will be used by erasureReadFile for + // reading from disks and erasure decoding. + chunkSize := getChunkSize(blockSize, dataBlocks) + pool := bpool.NewBytePool(chunkSize, len(disks)) + buf := &bytes.Buffer{} - size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums) + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) if err != nil { t.Error(err) } @@ -263,7 +270,7 @@ func TestErasureReadFileDiskFail(t *testing.T) { disks[5] = ReadDiskDown{disks[5].(*posix)} buf.Reset() - size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums) + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) if err != nil { t.Error(err) } @@ -278,7 +285,7 @@ func TestErasureReadFileDiskFail(t *testing.T) { disks[11] = ReadDiskDown{disks[11].(*posix)} buf.Reset() - size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums) + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) if err != nil { t.Error(err) } @@ -289,7 +296,7 @@ func TestErasureReadFileDiskFail(t *testing.T) { // 1 more disk down. 7 disks down in total. Read should fail. disks[12] = ReadDiskDown{disks[12].(*posix)} buf.Reset() - size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums) + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) if err != errXLReadQuorum { t.Fatal("expected errXLReadQuorum error") } @@ -347,11 +354,14 @@ func TestErasureReadFileOffsetLength(t *testing.T) { {length - blockSize - 1, blockSize}, {length - blockSize - 1, blockSize + 1}, } + chunkSize := getChunkSize(blockSize, dataBlocks) + pool := bpool.NewBytePool(chunkSize, len(disks)) + // Compare the data read from file with "data" byte array. for i, testCase := range testCases { expected := data[testCase.offset:(testCase.offset + testCase.length)] buf := &bytes.Buffer{} - size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums) + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) if err != nil { t.Error(err) continue @@ -405,6 +415,11 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { // To generate random offset/length. r := rand.New(rand.NewSource(time.Now().UnixNano())) + // create pool buffer which will be used by erasureReadFile for + // reading from disks and erasure decoding. + chunkSize := getChunkSize(blockSize, dataBlocks) + pool := bpool.NewBytePool(chunkSize, len(disks)) + buf := &bytes.Buffer{} // Verify erasureReadFile() for random offsets and lengths. @@ -414,7 +429,7 @@ func TestErasureReadFileRandomOffsetLength(t *testing.T) { expected := data[offset : offset+readLen] - size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums) + size, err = erasureReadFile(buf, disks, "testbucket", "testobject", offset, readLen, length, blockSize, dataBlocks, parityBlocks, checkSums, pool) if err != nil { t.Fatal(err, offset, readLen) } diff --git a/pkg/bpool/bpool.go b/pkg/bpool/bpool.go new file mode 100644 index 000000000..88fb163ef --- /dev/null +++ b/pkg/bpool/bpool.go @@ -0,0 +1,69 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package bpool implements a fixed size pool of byte slices. +package bpool + +import ( + "errors" + "sync" +) + +// ErrBpoolNoFree - Normally this error should never be returned, this error +// indicates a bug in the package consumer. +var ErrBpoolNoFree = errors.New("no free byte slice in pool") + +// BytePool - temporary pool of byte slices. +type BytePool struct { + buf [][]byte // array of byte slices + used []bool // indicates if a buf[i] is in use + size int64 // size of buf[i] + mu sync.Mutex +} + +// Get - Returns an unused byte slice. +func (b *BytePool) Get() (buf []byte, err error) { + b.mu.Lock() + defer b.mu.Unlock() + for i := 0; i < len(b.used); i++ { + if !b.used[i] { + b.used[i] = true + if b.buf[i] == nil { + b.buf[i] = make([]byte, b.size) + } + return b.buf[i], nil + } + } + return nil, ErrBpoolNoFree +} + +// Reset - Marks all slices as unused. +func (b *BytePool) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + for i := 0; i < len(b.used); i++ { + b.used[i] = false + } +} + +// NewBytePool - Returns new pool. +// size - length of each slice. +// n - number of slices in the pool. +func NewBytePool(size int64, n int) *BytePool { + used := make([]bool, n) + buf := make([][]byte, n) + return &BytePool{buf, used, size, sync.Mutex{}} +} diff --git a/pkg/bpool/bpool_test.go b/pkg/bpool/bpool_test.go new file mode 100644 index 000000000..8ace0b6de --- /dev/null +++ b/pkg/bpool/bpool_test.go @@ -0,0 +1,53 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package bpool + +import "testing" + +func TestBytePool(t *testing.T) { + size := int64(10) + n := 16 + pool := NewBytePool(size, n) + enBlocks := make([][]byte, n) + + // Allocates all the 16 byte slices in the pool. + alloc := func() { + for i := range enBlocks { + var err error + enBlocks[i], err = pool.Get() + if err != nil { + t.Fatal("expected nil, got", err) + } + // Make sure the slice length is as expected. + if len(enBlocks[i]) != int(size) { + t.Fatalf("expected size %d, got %d", len(enBlocks[i]), size) + } + } + } + + // Allocate everything in the pool. + alloc() + // Any Get() will fail when the pool does not have any free buffer. + _, err := pool.Get() + if err == nil { + t.Fatalf("expected %s, got nil", err) + } + // Reset - so that all the buffers are marked as unused. + pool.Reset() + // Allocation of all the buffers in the pool should succeed now. + alloc() +} diff --git a/xl-v1-object.go b/xl-v1-object.go index dd440adb0..cb266a179 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/mimedb" "github.com/minio/minio/pkg/objcache" ) @@ -153,6 +154,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } totalBytesRead := int64(0) + + chunkSize := getChunkSize(xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) + pool := bpool.NewBytePool(chunkSize, len(onlineDisks)) + // Read from all parts. for ; partIndex <= lastPartIndex; partIndex++ { if length == totalBytesRead { @@ -183,7 +188,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } // Start reading the part name. - n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums) + n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partOffset, readSize, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, checkSums, pool) if err != nil { return toObjectErr(err, bucket, object) }