erasureReadFile and erasureCreateFile testcases. (#2229)

* unit-tests: Unit tests for erasureCreateFile and erasureReadFile.

* appendFile() should return errXLWriteQuorum.

* TestErasureReadFileOffsetLength() tests erasureReadFile() for different offset and lengths.

* Fix for the failure seen in the erasure read unit test case. Issue #2227

* Move common erasure setup code to newErasureTestSetup()

* Review fixes. Add few more test cases for erasureReadFile.
master
Krishna Srinivas 8 years ago committed by Harshavardhana
parent 1f706e067d
commit 897d78d113
  1. 2
      erasure-createfile.go
  2. 97
      erasure-createfile_test.go
  3. 9
      erasure-readfile.go
  4. 160
      erasure-readfile_test.go
  5. 65
      erasure_test.go
  6. 2
      xl-v1-multipart.go

@ -139,7 +139,7 @@ func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, hash
// Do we have write quorum?. // Do we have write quorum?.
if !isDiskQuorum(wErrs, writeQuorum) { if !isDiskQuorum(wErrs, writeQuorum) {
return toObjectErr(errXLWriteQuorum, volume, path) return errXLWriteQuorum
} }
return nil return nil
} }

@ -0,0 +1,97 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"bytes"
"crypto/rand"
"testing"
)
// Simulates a faulty disk for AppendFile()
type AppendDiskDown struct {
*posix
}
func (a AppendDiskDown) AppendFile(volume string, path string, buf []byte) error {
return errFaultyDisk
}
// Test erasureCreateFile()
func TestErasureCreateFile(t *testing.T) {
// Initialize environment needed for the test.
dataBlocks := 7
parityBlocks := 7
blockSize := int64(blockSizeV1)
setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize)
if err != nil {
t.Error(err)
return
}
defer setup.Remove()
disks := setup.disks
// Prepare a slice of 1MB with random data.
data := make([]byte, 1*1024*1024)
_, err = rand.Read(data)
if err != nil {
t.Fatal(err)
}
// Test when all disks are up.
size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
// 2 disks down.
disks[4] = AppendDiskDown{disks[4].(*posix)}
disks[5] = AppendDiskDown{disks[5].(*posix)}
// Test when two disks are down.
size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
// 4 more disks down. 6 disks down in total.
disks[6] = AppendDiskDown{disks[6].(*posix)}
disks[7] = AppendDiskDown{disks[7].(*posix)}
disks[8] = AppendDiskDown{disks[8].(*posix)}
disks[9] = AppendDiskDown{disks[9].(*posix)}
size, _, err = erasureCreateFile(disks, "testbucket", "testobject3", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
// 1 more disk down. 7 disk down in total. Should return quorum error.
disks[10] = AppendDiskDown{disks[10].(*posix)}
size, _, err = erasureCreateFile(disks, "testbucket", "testobject4", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != errXLWriteQuorum {
t.Error("Expected errXLWriteQuorum error")
}
}

@ -262,14 +262,17 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume string, path s
} }
var outSize, outOffset int64 var outSize, outOffset int64
// Total data to be read.
outSize = blockSize
// If this is start block, skip unwanted bytes. // If this is start block, skip unwanted bytes.
if block == startBlock { if block == startBlock {
outOffset = bytesToSkip outOffset = bytesToSkip
outSize -= bytesToSkip
} }
// Total data to be read. if length-bytesWritten < outSize {
outSize = blockSize
if length-bytesWritten < blockSize {
// We should not send more data than what was requested. // We should not send more data than what was requested.
outSize = length - bytesWritten outSize = length - bytesWritten
} }

@ -16,7 +16,11 @@
package main package main
import "testing" import (
"bytes"
"crypto/rand"
"testing"
)
import "reflect" import "reflect"
// Tests getReadDisks which returns readable disks slice from which we can // Tests getReadDisks which returns readable disks slice from which we can
@ -193,7 +197,7 @@ func TestIsSuccessBlocks(t *testing.T) {
} }
// Wrapper function for testGetReadDisks, testGetOrderedDisks. // Wrapper function for testGetReadDisks, testGetOrderedDisks.
func TestErasureReadFile(t *testing.T) { func TestErasureReadUtils(t *testing.T) {
objLayer, dirs, err := getXLObjectLayer() objLayer, dirs, err := getXLObjectLayer()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -203,3 +207,155 @@ func TestErasureReadFile(t *testing.T) {
testGetReadDisks(t, xl) testGetReadDisks(t, xl)
testGetOrderedDisks(t, xl) testGetOrderedDisks(t, xl)
} }
// Simulates a faulty disk for ReadFile()
type ReadDiskDown struct {
*posix
}
func (r ReadDiskDown) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) {
return 0, errFaultyDisk
}
func TestErasureReadFileDiskFail(t *testing.T) {
// Initialize environment needed for the test.
dataBlocks := 7
parityBlocks := 7
blockSize := int64(blockSizeV1)
setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize)
if err != nil {
t.Error(err)
return
}
defer setup.Remove()
disks := setup.disks
// Prepare a slice of 1MB with random data.
data := make([]byte, 1*1024*1024)
length := int64(len(data))
_, err = rand.Read(data)
if err != nil {
t.Fatal(err)
}
// Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != length {
t.Errorf("erasureCreateFile returned %d, expected %d", size, length)
}
buf := &bytes.Buffer{}
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
if err != nil {
t.Error(err)
}
if !bytes.Equal(buf.Bytes(), data) {
t.Error("Contents of the erasure coded file differs")
}
// 2 disks down. Read should succeed.
disks[4] = ReadDiskDown{disks[4].(*posix)}
disks[5] = ReadDiskDown{disks[5].(*posix)}
buf.Reset()
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
if err != nil {
t.Error(err)
}
if !bytes.Equal(buf.Bytes(), data) {
t.Error("Contents of the erasure coded file differs")
}
// 4 more disks down. 6 disks down in total. Read should succeed.
disks[6] = ReadDiskDown{disks[6].(*posix)}
disks[8] = ReadDiskDown{disks[8].(*posix)}
disks[9] = ReadDiskDown{disks[9].(*posix)}
disks[11] = ReadDiskDown{disks[11].(*posix)}
buf.Reset()
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
if err != nil {
t.Error(err)
}
if !bytes.Equal(buf.Bytes(), data) {
t.Error("Contents of the erasure coded file differs")
}
// 1 more disk down. 7 disks down in total. Read should fail.
disks[12] = ReadDiskDown{disks[12].(*posix)}
buf.Reset()
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", 0, length, length, blockSize, dataBlocks, parityBlocks, checkSums)
if err != errXLReadQuorum {
t.Fatal("expected errXLReadQuorum error")
}
}
func TestErasureReadFileOffsetLength(t *testing.T) {
// Initialize environment needed for the test.
dataBlocks := 7
parityBlocks := 7
blockSize := int64(1 * 1024 * 1024)
setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize)
if err != nil {
t.Error(err)
return
}
defer setup.Remove()
disks := setup.disks
// Prepare a slice of 1MB with random data.
data := make([]byte, 5*1024*1024)
length := int64(len(data))
_, err = rand.Read(data)
if err != nil {
t.Fatal(err)
}
// Create a test file to read from.
size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != length {
t.Errorf("erasureCreateFile returned %d, expected %d", size, length)
}
testCases := []struct {
offset, length int64
}{
// Full file.
{0, length},
// 2nd block.
{blockSize, blockSize},
// Test cases for random offsets and lengths.
{blockSize - 1, 2},
{blockSize - 1, blockSize + 1},
{blockSize + 1, blockSize - 1},
{blockSize + 1, blockSize},
{blockSize + 1, blockSize + 1},
{blockSize*2 - 1, blockSize*3 + 1},
{length - 1, 1},
{length - blockSize, blockSize},
{length - blockSize - 1, blockSize},
{length - blockSize - 1, blockSize + 1},
}
// Compare the data read from file with "data" byte array.
for i, testCase := range testCases {
expected := data[testCase.offset:(testCase.offset + testCase.length)]
buf := &bytes.Buffer{}
size, err = erasureReadFile(buf, disks, "testbucket", "testobject", testCase.offset, testCase.length, length, blockSize, dataBlocks, parityBlocks, checkSums)
if err != nil {
t.Error(err)
continue
}
got := buf.Bytes()
if !bytes.Equal(expected, got) {
t.Errorf("Test %d : read data is different from what was expected", i+1)
}
}
}

@ -18,7 +18,6 @@ package main
import ( import (
"bytes" "bytes"
"crypto/rand"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
@ -187,24 +186,24 @@ func TestErasureDecode(t *testing.T) {
} }
} }
// Simulates a faulty disk for AppendFile() // Setup for erasureCreateFile and erasureReadFile tests.
type AppendDiskDown struct { type erasureTestSetup struct {
*posix dataBlocks int
parityBlocks int
blockSize int64
diskPaths []string
disks []StorageAPI
} }
func (a AppendDiskDown) AppendFile(volume string, path string, buf []byte) error { // Removes the temporary disk directories.
return errFaultyDisk func (e erasureTestSetup) Remove() {
for _, path := range e.diskPaths {
removeAll(path)
}
} }
// Test erasureCreateFile() // Returns an initialized setup for erasure tests.
// TODO: func newErasureTestSetup(dataBlocks int, parityBlocks int, blockSize int64) (*erasureTestSetup, error) {
// * check when more disks are down.
// * verify written content by using erasureReadFile.
func TestErasureCreateFile(t *testing.T) {
// Initialize environment needed for the test.
dataBlocks := 7
parityBlocks := 7
blockSize := int64(blockSizeV1)
diskPaths := make([]string, dataBlocks+parityBlocks) diskPaths := make([]string, dataBlocks+parityBlocks)
disks := make([]StorageAPI, len(diskPaths)) disks := make([]StorageAPI, len(diskPaths))
@ -212,44 +211,16 @@ func TestErasureCreateFile(t *testing.T) {
var err error var err error
diskPaths[i], err = ioutil.TempDir(os.TempDir(), "minio-") diskPaths[i], err = ioutil.TempDir(os.TempDir(), "minio-")
if err != nil { if err != nil {
t.Fatal("Unable to create tmp dir", err) return nil, err
} }
defer removeAll(diskPaths[i])
disks[i], err = newPosix(diskPaths[i]) disks[i], err = newPosix(diskPaths[i])
if err != nil { if err != nil {
t.Fatal(err) return nil, err
} }
err = disks[i].MakeVol("testbucket") err = disks[i].MakeVol("testbucket")
if err != nil { if err != nil {
t.Fatal(err) return nil, err
}
}
// Prepare a slice of 1MB with random data.
data := make([]byte, 1*1024*1024)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}
// Test when all disks are up.
size, _, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
}
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
}
// Two disks down.
disks[4] = AppendDiskDown{disks[4].(*posix)}
disks[5] = AppendDiskDown{disks[5].(*posix)}
// Test when two disks are down.
size, _, err = erasureCreateFile(disks, "testbucket", "testobject2", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, dataBlocks+1)
if err != nil {
t.Fatal(err)
} }
if size != int64(len(data)) {
t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data))
} }
return &erasureTestSetup{dataBlocks, parityBlocks, blockSize, diskPaths, disks}, nil
} }

@ -386,7 +386,7 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Erasure code data and write across all disks. // Erasure code data and write across all disks.
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum) sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum)
if err != nil { if err != nil {
return "", toObjectErr(err, minioMetaBucket, tmpPartPath) return "", toObjectErr(err, bucket, object)
} }
// Should return IncompleteBody{} error when reader has fewer bytes // Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header. // than specified in request header.

Loading…
Cancel
Save