From fffe4ac7e6218d3a3a5da6720b58b094003a9522 Mon Sep 17 00:00:00 2001 From: Frank Wessels Date: Fri, 11 Aug 2017 18:24:48 -0700 Subject: [PATCH] Prevent unnecessary verification of parity blocks while reading (#4683) * Prevent unnecessary verification of parity blocks while reading erasure coded file. * Update klauspost/reedsolomon and just only reconstruct data blocks while reading (prevent unnecessary parity block reconstruction) * Remove Verification of (all) reconstructed Data and Parity blocks since in our case we are protected by bit rot protection. And even if the verification would fail (essentially impossible) there is no way to definitively say whether the data is still correct or not, so this call make no sense for our use case. --- cmd/erasure-createfile_test.go | 4 +- cmd/erasure-healfile.go | 4 +- cmd/erasure-readfile.go | 32 ++- cmd/erasure_test.go | 64 ++++-- .../klauspost/reedsolomon/README.md | 39 +++- .../klauspost/reedsolomon/galois_amd64.go | 16 +- .../klauspost/reedsolomon/galois_noasm.go | 4 +- .../klauspost/reedsolomon/matrix.go | 2 +- .../klauspost/reedsolomon/options.go | 78 +++++++ .../klauspost/reedsolomon/reedsolomon.go | 204 ++++++++++++++---- .../klauspost/reedsolomon/streaming.go | 31 ++- vendor/vendor.json | 6 +- 12 files changed, 376 insertions(+), 108 deletions(-) create mode 100644 vendor/github.com/klauspost/reedsolomon/options.go diff --git a/cmd/erasure-createfile_test.go b/cmd/erasure-createfile_test.go index fcf0ce6c6..f6cd17a97 100644 --- a/cmd/erasure-createfile_test.go +++ b/cmd/erasure-createfile_test.go @@ -174,11 +174,11 @@ func TestErasureEncode(t *testing.T) { reedsolomon.ErrInvShardNum, }, // TestCase - 8. - // test case with data + parity blocks > 255. + // test case with data + parity blocks > 256. // expected to fail with Error Max Shard number. { []byte("1"), - 128, + 129, 128, false, reedsolomon.ErrMaxShardNum, diff --git a/cmd/erasure-healfile.go b/cmd/erasure-healfile.go index 2d0e852cb..dc151cf35 100644 --- a/cmd/erasure-healfile.go +++ b/cmd/erasure-healfile.go @@ -53,8 +53,8 @@ func erasureHealFile(latestDisks []StorageAPI, outDatedDisks []StorageAPI, volum } } - // Reconstruct missing data. - err := decodeData(enBlocks, dataBlocks, parityBlocks) + // Reconstruct any missing data and parity blocks. + err := decodeDataAndParity(enBlocks, dataBlocks, parityBlocks) if err != nil { return nil, err } diff --git a/cmd/erasure-readfile.go b/cmd/erasure-readfile.go index bf03f033c..21d581803 100644 --- a/cmd/erasure-readfile.go +++ b/cmd/erasure-readfile.go @@ -17,7 +17,6 @@ package cmd import ( - "errors" "io" "sync" @@ -272,7 +271,7 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume, path string, // If we have all the data blocks no need to decode, continue to write. if !isSuccessDataBlocks(enBlocks, dataBlocks) { // Reconstruct the missing data blocks. - if err := decodeData(enBlocks, dataBlocks, parityBlocks); err != nil { + if err := decodeMissingData(enBlocks, dataBlocks, parityBlocks); err != nil { return bytesWritten, err } } @@ -314,31 +313,26 @@ func erasureReadFile(writer io.Writer, disks []StorageAPI, volume, path string, return bytesWritten, nil } -// decodeData - decode encoded blocks. -func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { - // Initialized reedsolomon. +// decodeMissingData - decode any missing data blocks. +func decodeMissingData(enBlocks [][]byte, dataBlocks, parityBlocks int) error { + // Initialize reedsolomon. rs, err := reedsolomon.New(dataBlocks, parityBlocks) if err != nil { return traceError(err) } - // Reconstruct encoded blocks. - err = rs.Reconstruct(enBlocks) - if err != nil { - return traceError(err) - } + // Reconstruct any missing data blocks. + return rs.ReconstructData(enBlocks) +} - // Verify reconstructed blocks (parity). - ok, err := rs.Verify(enBlocks) +// decodeDataAndParity - decode all encoded data and parity blocks. +func decodeDataAndParity(enBlocks [][]byte, dataBlocks, parityBlocks int) error { + // Initialize reedsolomon. + rs, err := reedsolomon.New(dataBlocks, parityBlocks) if err != nil { return traceError(err) } - if !ok { - // Blocks cannot be reconstructed, corrupted data. - err = errors.New("Verification failed after reconstruction, data likely corrupted") - return traceError(err) - } - // Success. - return nil + // Reconstruct encoded blocks. + return rs.Reconstruct(enBlocks) } diff --git a/cmd/erasure_test.go b/cmd/erasure_test.go index ceed3f449..84f5e538c 100644 --- a/cmd/erasure_test.go +++ b/cmd/erasure_test.go @@ -124,26 +124,54 @@ func TestErasureDecode(t *testing.T) { // Data block size. blockSize := len(data) - // Generates encoded data based on type of testCase function. - encodedData := testCase.enFn(data, dataBlocks, parityBlocks) - - // Decodes the data. - err := decodeData(encodedData, dataBlocks, parityBlocks) - if err != nil && testCase.shouldPass { - t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) - } - - // Proceed to extract the data blocks. - decodedDataWriter := new(bytes.Buffer) - _, err = writeDataBlocks(decodedDataWriter, encodedData, dataBlocks, 0, int64(blockSize)) - if err != nil && testCase.shouldPass { - t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + // Test decoder for just the missing data blocks + { + // Generates encoded data based on type of testCase function. + encodedData := testCase.enFn(data, dataBlocks, parityBlocks) + + // Decodes the data. + err := decodeMissingData(encodedData, dataBlocks, parityBlocks) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Proceed to extract the data blocks. + decodedDataWriter := new(bytes.Buffer) + _, err = writeDataBlocks(decodedDataWriter, encodedData, dataBlocks, 0, int64(blockSize)) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Validate if decoded data is what we expected. + if bytes.Equal(decodedDataWriter.Bytes(), data) != testCase.shouldPass { + err := errUnexpected + t.Errorf("Test %d: Expected to pass by failed instead %s", i+1, err) + } } - // Validate if decoded data is what we expected. - if bytes.Equal(decodedDataWriter.Bytes(), data) != testCase.shouldPass { - err := errUnexpected - t.Errorf("Test %d: Expected to pass by failed instead %s", i+1, err) + // Test decoder for all missing data and parity blocks + { + // Generates encoded data based on type of testCase function. + encodedData := testCase.enFn(data, dataBlocks, parityBlocks) + + // Decodes the data. + err := decodeDataAndParity(encodedData, dataBlocks, parityBlocks) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Proceed to extract the data blocks. + decodedDataWriter := new(bytes.Buffer) + _, err = writeDataBlocks(decodedDataWriter, encodedData, dataBlocks, 0, int64(blockSize)) + if err != nil && testCase.shouldPass { + t.Errorf("Test %d: Expected to pass by failed instead with %s", i+1, err) + } + + // Validate if decoded data is what we expected. + if bytes.Equal(decodedDataWriter.Bytes(), data) != testCase.shouldPass { + err := errUnexpected + t.Errorf("Test %d: Expected to pass by failed instead %s", i+1, err) + } } } } diff --git a/vendor/github.com/klauspost/reedsolomon/README.md b/vendor/github.com/klauspost/reedsolomon/README.md index 3e7f51841..37c2702fd 100644 --- a/vendor/github.com/klauspost/reedsolomon/README.md +++ b/vendor/github.com/klauspost/reedsolomon/README.md @@ -81,6 +81,17 @@ To indicate missing data, you set the shard to nil before calling `Reconstruct() ``` The missing data and parity shards will be recreated. If more than 3 shards are missing, the reconstruction will fail. +If you are only interested in the data shards (for reading purposes) you can call `ReconstructData()`: + +```Go + // Delete two data shards + data[3] = nil + data[7] = nil + + // Reconstruct just the missing data shards + err := enc.ReconstructData(data) +``` + So to sum up reconstruction: * The number of data/parity shards must match the numbers used for encoding. * The order of shards must be the same as used when encoding. @@ -101,7 +112,7 @@ You might have a large slice of data. To help you split this, there are some hel ``` This will split the file into the number of data shards set when creating the encoder and create empty parity shards. -An important thing to note is that you have to *keep track of the exact input size*. If the size of the input isn't diviable by the number of data shards, extra zeros will be inserted in the last shard. +An important thing to note is that you have to *keep track of the exact input size*. If the size of the input isn't divisible by the number of data shards, extra zeros will be inserted in the last shard. To join a data set, use the `Join()` function, which will join the shards and write it to the `io.Writer` you supply: ```Go @@ -153,7 +164,7 @@ This also means that you can divide big input up into smaller blocks, and do rec # Streaming API -There has been added a fully streaming API, to help perform fully streaming operations, which enables you to do the same operations, but on streams. To use the stream API, use [`NewStream`](https://godoc.org/github.com/klauspost/reedsolomon#NewStream) function to create the encoding/decoding interfaces. You can use [`NewStreamC`](https://godoc.org/github.com/klauspost/reedsolomon#NewStreamC) to ready an interface that reads/writes concurrently from the streams. +There has been added support for a streaming API, to help perform fully streaming operations, which enables you to do the same operations, but on streams. To use the stream API, use [`NewStream`](https://godoc.org/github.com/klauspost/reedsolomon#NewStream) function to create the encoding/decoding interfaces. You can use [`NewStreamC`](https://godoc.org/github.com/klauspost/reedsolomon#NewStreamC) to ready an interface that reads/writes concurrently from the streams. Input is delivered as `[]io.Reader`, output as `[]io.Writer`, and functionality corresponds to the in-memory API. Each stream must supply the same amount of data, similar to how each slice must be similar size with the in-memory API. If an error occurs in relation to a stream, a [`StreamReadError`](https://godoc.org/github.com/klauspost/reedsolomon#StreamReadError) or [`StreamWriteError`](https://godoc.org/github.com/klauspost/reedsolomon#StreamWriteError) will help you determine which stream was the offender. @@ -162,6 +173,18 @@ There is no buffering or timeouts/retry specified. If you want to add that, you For complete examples of a streaming encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples). +#Advanced Options + +You can modify internal options which affects how jobs are split between and processed by goroutines. + +To create options, use the WithXXX functions. You can supply options to `New`, `NewStream` and `NewStreamC`. If no Options are supplied, default options are used. + +Example of how to supply options: + + ```Go + enc, err := reedsolomon.New(10, 3, WithMaxGoroutines(25)) + ``` + # Performance Performance depends mainly on the number of parity shards. In rough terms, doubling the number of parity shards will double the encoding time. @@ -186,6 +209,18 @@ Example of performance scaling on Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz - 4 ph | 4 | 3179,33 | 235% | | 8 | 4346,18 | 321% | +Benchmarking `Reconstruct()` followed by a `Verify()` (=`all`) versus just calling `ReconstructData()` (=`data`) gives the following result: +``` +benchmark all MB/s data MB/s speedup +BenchmarkReconstruct10x2x10000-8 2011.67 10530.10 5.23x +BenchmarkReconstruct50x5x50000-8 4585.41 14301.60 3.12x +BenchmarkReconstruct10x2x1M-8 8081.15 28216.41 3.49x +BenchmarkReconstruct5x2x1M-8 5780.07 28015.37 4.85x +BenchmarkReconstruct10x4x1M-8 4352.56 14367.61 3.30x +BenchmarkReconstruct50x20x1M-8 1364.35 4189.79 3.07x +BenchmarkReconstruct10x4x16M-8 1484.35 5779.53 3.89x +``` + # asm2plan9s [asm2plan9s](https://github.com/fwessels/asm2plan9s) is used for assembling the AVX2 instructions into their BYTE/WORD/LONG equivalents. diff --git a/vendor/github.com/klauspost/reedsolomon/galois_amd64.go b/vendor/github.com/klauspost/reedsolomon/galois_amd64.go index e4d686e7a..bb99ea659 100644 --- a/vendor/github.com/klauspost/reedsolomon/galois_amd64.go +++ b/vendor/github.com/klauspost/reedsolomon/galois_amd64.go @@ -5,10 +5,6 @@ package reedsolomon -import ( - "github.com/klauspost/cpuid" -) - //go:noescape func galMulSSSE3(low, high, in, out []byte) @@ -40,12 +36,12 @@ func galMulSSSE3Xor(low, high, in, out []byte) { } */ -func galMulSlice(c byte, in, out []byte) { +func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) { var done int - if cpuid.CPU.AVX2() { + if avx2 { galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 5) << 5 - } else if cpuid.CPU.SSSE3() { + } else if ssse3 { galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 4) << 4 } @@ -58,12 +54,12 @@ func galMulSlice(c byte, in, out []byte) { } } -func galMulSliceXor(c byte, in, out []byte) { +func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) { var done int - if cpuid.CPU.AVX2() { + if avx2 { galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 5) << 5 - } else if cpuid.CPU.SSSE3() { + } else if ssse3 { galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) done = (len(in) >> 4) << 4 } diff --git a/vendor/github.com/klauspost/reedsolomon/galois_noasm.go b/vendor/github.com/klauspost/reedsolomon/galois_noasm.go index 1c6b8c4da..be90a3311 100644 --- a/vendor/github.com/klauspost/reedsolomon/galois_noasm.go +++ b/vendor/github.com/klauspost/reedsolomon/galois_noasm.go @@ -4,14 +4,14 @@ package reedsolomon -func galMulSlice(c byte, in, out []byte) { +func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) { mt := mulTable[c] for n, input := range in { out[n] = mt[input] } } -func galMulSliceXor(c byte, in, out []byte) { +func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) { mt := mulTable[c] for n, input := range in { out[n] ^= mt[input] diff --git a/vendor/github.com/klauspost/reedsolomon/matrix.go b/vendor/github.com/klauspost/reedsolomon/matrix.go index e942ead94..339913a75 100644 --- a/vendor/github.com/klauspost/reedsolomon/matrix.go +++ b/vendor/github.com/klauspost/reedsolomon/matrix.go @@ -137,7 +137,7 @@ func (m matrix) Augment(right matrix) (matrix, error) { } // errMatrixSize is returned if matrix dimensions are doesn't match. -var errMatrixSize = errors.New("matrix sizes does not match") +var errMatrixSize = errors.New("matrix sizes do not match") func (m matrix) SameSize(n matrix) error { if len(m) != len(n) { diff --git a/vendor/github.com/klauspost/reedsolomon/options.go b/vendor/github.com/klauspost/reedsolomon/options.go new file mode 100644 index 000000000..44236614d --- /dev/null +++ b/vendor/github.com/klauspost/reedsolomon/options.go @@ -0,0 +1,78 @@ +package reedsolomon + +import ( + "runtime" + + "github.com/klauspost/cpuid" +) + +// Option allows to override processing parameters. +type Option func(*options) + +type options struct { + maxGoroutines int + minSplitSize int + useAVX2, useSSSE3 bool + usePAR1Matrix bool +} + +var defaultOptions = options{ + maxGoroutines: 50, + minSplitSize: 512, +} + +func init() { + if runtime.GOMAXPROCS(0) <= 1 { + defaultOptions.maxGoroutines = 1 + } + // Detect CPU capabilities. + defaultOptions.useSSSE3 = cpuid.CPU.SSSE3() + defaultOptions.useAVX2 = cpuid.CPU.AVX2() +} + +// WithMaxGoroutines is the maximum number of goroutines number for encoding & decoding. +// Jobs will be split into this many parts, unless each goroutine would have to process +// less than minSplitSize bytes (set with WithMinSplitSize). +// For the best speed, keep this well above the GOMAXPROCS number for more fine grained +// scheduling. +// If n <= 0, it is ignored. +func WithMaxGoroutines(n int) Option { + return func(o *options) { + if n > 0 { + o.maxGoroutines = n + } + } +} + +// WithMinSplitSize is the minimum encoding size in bytes per goroutine. +// See WithMaxGoroutines on how jobs are split. +// If n <= 0, it is ignored. +func WithMinSplitSize(n int) Option { + return func(o *options) { + if n > 0 { + o.minSplitSize = n + } + } +} + +func withSSE3(enabled bool) Option { + return func(o *options) { + o.useSSSE3 = enabled + } +} + +func withAVX2(enabled bool) Option { + return func(o *options) { + o.useAVX2 = enabled + } +} + +// WithPAR1Matrix causes the encoder to build the matrix how PARv1 +// does. Note that the method they use is buggy, and may lead to cases +// where recovery is impossible, even if there are enough parity +// shards. +func WithPAR1Matrix() Option { + return func(o *options) { + o.usePAR1Matrix = true + } +} diff --git a/vendor/github.com/klauspost/reedsolomon/reedsolomon.go b/vendor/github.com/klauspost/reedsolomon/reedsolomon.go index 914ebe0ad..4bb84c373 100644 --- a/vendor/github.com/klauspost/reedsolomon/reedsolomon.go +++ b/vendor/github.com/klauspost/reedsolomon/reedsolomon.go @@ -15,7 +15,6 @@ import ( "bytes" "errors" "io" - "runtime" "sync" ) @@ -50,6 +49,21 @@ type Encoder interface { // Use the Verify function to check if data set is ok. Reconstruct(shards [][]byte) error + // ReconstructData will recreate any missing data shards, if possible. + // + // Given a list of shards, some of which contain data, fills in the + // data shards that don't have data. + // + // The length of the array must be equal to Shards. + // You indicate that a shard is missing by setting it to nil. + // + // If there are too few shards to reconstruct the missing + // ones, ErrTooFewShards will be returned. + // + // As the reconstructed shard set may contain missing parity shards, + // calling the Verify function is likely to fail. + ReconstructData(shards [][]byte) error + // Split a data slice into the number of shards given to the encoder, // and create empty parity shards. // @@ -83,52 +97,114 @@ type reedSolomon struct { m matrix tree inversionTree parity [][]byte + o options } // ErrInvShardNum will be returned by New, if you attempt to create // an Encoder where either data or parity shards is zero or less. var ErrInvShardNum = errors.New("cannot create Encoder with zero or less data/parity shards") -// ErrMaxShardNum will be returned by New, if you attempt to create -// an Encoder where data and parity shards cannot be bigger than -// Galois field GF(2^8) - 1. -var ErrMaxShardNum = errors.New("cannot create Encoder with 255 or more data+parity shards") +// ErrMaxShardNum will be returned by New, if you attempt to create an +// Encoder where data and parity shards are bigger than the order of +// GF(2^8). +var ErrMaxShardNum = errors.New("cannot create Encoder with more than 256 data+parity shards") + +// buildMatrix creates the matrix to use for encoding, given the +// number of data shards and the number of total shards. +// +// The top square of the matrix is guaranteed to be an identity +// matrix, which means that the data shards are unchanged after +// encoding. +func buildMatrix(dataShards, totalShards int) (matrix, error) { + // Start with a Vandermonde matrix. This matrix would work, + // in theory, but doesn't have the property that the data + // shards are unchanged after encoding. + vm, err := vandermonde(totalShards, dataShards) + if err != nil { + return nil, err + } + + // Multiply by the inverse of the top square of the matrix. + // This will make the top square be the identity matrix, but + // preserve the property that any square subset of rows is + // invertible. + top, err := vm.SubMatrix(0, 0, dataShards, dataShards) + if err != nil { + return nil, err + } + + topInv, err := top.Invert() + if err != nil { + return nil, err + } + + return vm.Multiply(topInv) +} + +// buildMatrixPAR1 creates the matrix to use for encoding according to +// the PARv1 spec, given the number of data shards and the number of +// total shards. Note that the method they use is buggy, and may lead +// to cases where recovery is impossible, even if there are enough +// parity shards. +// +// The top square of the matrix is guaranteed to be an identity +// matrix, which means that the data shards are unchanged after +// encoding. +func buildMatrixPAR1(dataShards, totalShards int) (matrix, error) { + result, err := newMatrix(totalShards, dataShards) + if err != nil { + return nil, err + } + + for r, row := range result { + // The top portion of the matrix is the identity + // matrix, and the bottom is a transposed Vandermonde + // matrix starting at 1 instead of 0. + if r < dataShards { + result[r][r] = 1 + } else { + for c := range row { + result[r][c] = galExp(byte(c+1), r-dataShards) + } + } + } + return result, nil +} // New creates a new encoder and initializes it to // the number of data shards and parity shards that // you want to use. You can reuse this encoder. -// Note that the maximum number of data shards is 256. -func New(dataShards, parityShards int) (Encoder, error) { +// Note that the maximum number of total shards is 256. +// If no options are supplied, default options are used. +func New(dataShards, parityShards int, opts ...Option) (Encoder, error) { r := reedSolomon{ DataShards: dataShards, ParityShards: parityShards, Shards: dataShards + parityShards, + o: defaultOptions, } + for _, opt := range opts { + opt(&r.o) + } if dataShards <= 0 || parityShards <= 0 { return nil, ErrInvShardNum } - if dataShards+parityShards > 255 { + if dataShards+parityShards > 256 { return nil, ErrMaxShardNum } - // Start with a Vandermonde matrix. This matrix would work, - // in theory, but doesn't have the property that the data - // shards are unchanged after encoding. - vm, err := vandermonde(r.Shards, dataShards) + var err error + if r.o.usePAR1Matrix { + r.m, err = buildMatrixPAR1(dataShards, r.Shards) + } else { + r.m, err = buildMatrix(dataShards, r.Shards) + } if err != nil { return nil, err } - // Multiply by the inverse of the top square of the matrix. - // This will make the top square be the identity matrix, but - // preserve the property that any square subset of rows is - // invertible. - top, _ := vm.SubMatrix(0, 0, dataShards, dataShards) - top, _ = top.Invert() - r.m, _ = vm.Multiply(top) - // Inverted matrices are cached in a tree keyed by the indices // of the invalid rows of the data to reconstruct. // The inversion root node will have the identity matrix as @@ -201,7 +277,7 @@ func (r reedSolomon) Verify(shards [][]byte) (bool, error) { // number of matrix rows used, is determined by // outputCount, which is the number of outputs to compute. func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { - if runtime.GOMAXPROCS(0) > 1 && len(inputs[0]) > minSplitSize { + if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize { r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount) return } @@ -209,26 +285,21 @@ func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, output in := inputs[c] for iRow := 0; iRow < outputCount; iRow++ { if c == 0 { - galMulSlice(matrixRows[iRow][c], in, outputs[iRow]) + galMulSlice(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } else { - galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } } } } -const ( - minSplitSize = 512 // min split size per goroutine - maxGoroutines = 50 // max goroutines number for encoding & decoding -) - // Perform the same as codeSomeShards, but split the workload into // several goroutines. func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { var wg sync.WaitGroup - do := byteCount / maxGoroutines - if do < minSplitSize { - do = minSplitSize + do := byteCount / r.o.maxGoroutines + if do < r.o.minSplitSize { + do = r.o.minSplitSize } start := 0 for start < byteCount { @@ -241,9 +312,9 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu in := inputs[c] for iRow := 0; iRow < outputCount; iRow++ { if c == 0 { - galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) + galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2) } else { - galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) + galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2) } } } @@ -258,13 +329,36 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu // except this will check values and return // as soon as a difference is found. func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { + if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize { + return r.checkSomeShardsP(matrixRows, inputs, toCheck, outputCount, byteCount) + } + outputs := make([][]byte, len(toCheck)) + for i := range outputs { + outputs[i] = make([]byte, byteCount) + } + for c := 0; c < r.DataShards; c++ { + in := inputs[c] + for iRow := 0; iRow < outputCount; iRow++ { + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) + } + } + + for i, calc := range outputs { + if !bytes.Equal(calc, toCheck[i]) { + return false + } + } + return true +} + +func (r reedSolomon) checkSomeShardsP(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { same := true var mu sync.RWMutex // For above var wg sync.WaitGroup - do := byteCount / maxGoroutines - if do < minSplitSize { - do = minSplitSize + do := byteCount / r.o.maxGoroutines + if do < r.o.minSplitSize { + do = r.o.minSplitSize } start := 0 for start < byteCount { @@ -287,7 +381,7 @@ func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outpu mu.RUnlock() in := inputs[c][start : start+do] for iRow := 0; iRow < outputCount; iRow++ { - galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) + galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2) } } @@ -312,7 +406,7 @@ var ErrShardNoData = errors.New("no shard data") // ErrShardSize is returned if shard length isn't the same for all // shards. -var ErrShardSize = errors.New("shard sizes does not match") +var ErrShardSize = errors.New("shard sizes do not match") // checkShards will check if shards are the same size // or 0, if allowed. An error is returned if this fails. @@ -358,6 +452,35 @@ func shardSize(shards [][]byte) int { // The reconstructed shard set is complete, but integrity is not verified. // Use the Verify function to check if data set is ok. func (r reedSolomon) Reconstruct(shards [][]byte) error { + return r.reconstruct(shards, false) +} + +// ReconstructData will recreate any missing data shards, if possible. +// +// Given a list of shards, some of which contain data, fills in the +// data shards that don't have data. +// +// The length of the array must be equal to Shards. +// You indicate that a shard is missing by setting it to nil. +// +// If there are too few shards to reconstruct the missing +// ones, ErrTooFewShards will be returned. +// +// As the reconstructed shard set may contain missing parity shards, +// calling the Verify function is likely to fail. +func (r reedSolomon) ReconstructData(shards [][]byte) error { + return r.reconstruct(shards, true) +} + +// reconstruct will recreate the missing data shards, and unless +// dataOnly is true, also the missing parity shards +// +// The length of the array must be equal to Shards. +// You indicate that a shard is missing by setting it to nil. +// +// If there are too few shards to reconstruct the missing +// ones, ErrTooFewShards will be returned. +func (r reedSolomon) reconstruct(shards [][]byte, dataOnly bool) error { if len(shards) != r.Shards { return ErrTooFewShards } @@ -464,6 +587,11 @@ func (r reedSolomon) Reconstruct(shards [][]byte) error { } r.codeSomeShards(matrixRows, subShards, outputs[:outputCount], outputCount, shardSize) + if dataOnly { + // Exit out early if we are only interested in the data shards + return nil + } + // Now that we have all of the data shards intact, we can // compute any of the parity that is missing. // diff --git a/vendor/github.com/klauspost/reedsolomon/streaming.go b/vendor/github.com/klauspost/reedsolomon/streaming.go index 293a8b129..9e55d7352 100644 --- a/vendor/github.com/klauspost/reedsolomon/streaming.go +++ b/vendor/github.com/klauspost/reedsolomon/streaming.go @@ -145,8 +145,8 @@ type rsStream struct { // the number of data shards and parity shards that // you want to use. You can reuse this encoder. // Note that the maximum number of data shards is 256. -func NewStream(dataShards, parityShards int) (StreamEncoder, error) { - enc, err := New(dataShards, parityShards) +func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) { + enc, err := New(dataShards, parityShards, o...) if err != nil { return nil, err } @@ -161,8 +161,8 @@ func NewStream(dataShards, parityShards int) (StreamEncoder, error) { // the number of data shards and parity shards given. // // This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes. -func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) { - enc, err := New(dataShards, parityShards) +func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) { + enc, err := New(dataShards, parityShards, o...) if err != nil { return nil, err } @@ -256,7 +256,7 @@ func trimShards(in [][]byte, size int) [][]byte { func readShards(dst [][]byte, in []io.Reader) error { if len(in) != len(dst) { - panic("internal error: in and dst size does not match") + panic("internal error: in and dst size do not match") } size := -1 for i := range in { @@ -291,7 +291,7 @@ func readShards(dst [][]byte, in []io.Reader) error { func writeShards(out []io.Writer, in [][]byte) error { if len(out) != len(in) { - panic("internal error: in and out size does not match") + panic("internal error: in and out size do not match") } for i := range in { if out[i] == nil { @@ -318,7 +318,7 @@ type readResult struct { // cReadShards reads shards concurrently func cReadShards(dst [][]byte, in []io.Reader) error { if len(in) != len(dst) { - panic("internal error: in and dst size does not match") + panic("internal error: in and dst size do not match") } var wg sync.WaitGroup wg.Add(len(in)) @@ -366,7 +366,7 @@ func cReadShards(dst [][]byte, in []io.Reader) error { // cWriteShards writes shards concurrently func cWriteShards(out []io.Writer, in [][]byte) error { if len(out) != len(in) { - panic("internal error: in and out size does not match") + panic("internal error: in and out size do not match") } var errs = make(chan error, len(out)) var wg sync.WaitGroup @@ -450,8 +450,9 @@ var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutual // If there are too few shards to reconstruct the missing // ones, ErrTooFewShards will be returned. // -// The reconstructed shard set is complete, but integrity is not verified. -// Use the Verify function to check if data set is ok. +// The reconstructed shard set is complete when explicitly asked for all missing shards. +// However its integrity is not automatically verified. +// Use the Verify function to check in case the data set is complete. func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error { if len(valid) != r.r.Shards { return ErrTooFewShards @@ -461,10 +462,14 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error { } all := createSlice(r.r.Shards, r.bs) + reconDataOnly := true for i := range valid { if valid[i] != nil && fill[i] != nil { return ErrReconstructMismatch } + if i >= r.r.DataShards && fill[i] != nil { + reconDataOnly = false + } } read := 0 @@ -482,7 +487,11 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error { read += shardSize(all) all = trimShards(all, shardSize(all)) - err = r.r.Reconstruct(all) + if reconDataOnly { + err = r.r.ReconstructData(all) // just reconstruct missing data shards + } else { + err = r.r.Reconstruct(all) // reconstruct all missing shards + } if err != nil { return err } diff --git a/vendor/vendor.json b/vendor/vendor.json index 5a71dc91e..4f7de1b7d 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -243,10 +243,10 @@ "revisionTime": "2016-10-16T15:41:25Z" }, { - "checksumSHA1": "Pzd1bfm8Yj1radncaohNZu+UT1I=", + "checksumSHA1": "DnS7x0Gqc93p4hQ88FgE35vfIRw=", "path": "github.com/klauspost/reedsolomon", - "revision": "d0a56f72c0d40a6cdde43a1575ad9686a0098b70", - "revisionTime": "2016-10-28T07:13:20Z" + "revision": "a9202d772777d8d2264c3e0c6159be5047697380", + "revisionTime": "2017-07-19T04:51:23Z" }, { "checksumSHA1": "dNYxHiBLalTqluak2/Z8c3RsSEM=",