diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go new file mode 100644 index 000000000..63f1d9ace --- /dev/null +++ b/cmd/bitrot-streaming.go @@ -0,0 +1,172 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "context" + "encoding/hex" + "hash" + "io" + + "github.com/minio/minio/cmd/logger" +) + +// Calculates bitrot in chunks and writes the hash into the stream. +type streamingBitrotWriter struct { + iow *io.PipeWriter + h hash.Hash + shardSize int64 + canClose chan struct{} // Needed to avoid race explained in Close() call. + + // Following two fields are used only to make sure that Write(p) is called such that + // len(p) is always the block size except the last block, i.e prevent programmer errors. + currentBlockIdx int + verifyTillIdx int +} + +func (b *streamingBitrotWriter) Write(p []byte) (int, error) { + if b.currentBlockIdx < b.verifyTillIdx && int64(len(p)) != b.shardSize { + // All blocks except last should be of the length b.shardSize + logger.LogIf(context.Background(), errUnexpected) + return 0, errUnexpected + } + if len(p) == 0 { + return 0, nil + } + b.h.Reset() + b.h.Write(p) + hashBytes := b.h.Sum(nil) + n, err := b.iow.Write(hashBytes) + if n != len(hashBytes) { + logger.LogIf(context.Background(), err) + return 0, err + } + n, err = b.iow.Write(p) + b.currentBlockIdx++ + return n, err +} + +func (b *streamingBitrotWriter) Close() error { + err := b.iow.Close() + // Wait for all data to be written before returning else it causes race conditions. + // Race condition is because of io.PipeWriter implementation. i.e consider the following + // sequent of operations: + // 1) pipe.Write() + // 2) pipe.Close() + // Now pipe.Close() can return before the data is read on the other end of the pipe and written to the disk + // Hence an immediate Read() on the file can return incorrect data. + <-b.canClose + return err +} + +// Returns streaming bitrot writer implementation. +func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { + r, w := io.Pipe() + h := algo.New() + bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{}), 0, int(length / shardSize)} + go func() { + bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. + totalFileSize := bitrotSumsTotalSize + length + err := disk.CreateFile(volume, filePath, totalFileSize, r) + if err != nil { + logger.LogIf(context.Background(), err) + r.CloseWithError(err) + } + close(bw.canClose) + }() + return bw +} + +// ReadAt() implementation which verifies the bitrot hash available as part of the stream. +type streamingBitrotReader struct { + disk StorageAPI + rc io.ReadCloser + volume string + filePath string + tillOffset int64 + currOffset int64 + h hash.Hash + shardSize int64 + hashBytes []byte +} + +func (b *streamingBitrotReader) Close() error { + if b.rc == nil { + return nil + } + return b.rc.Close() +} + +func (b *streamingBitrotReader) ReadAt(buf []byte, offset int64) (int, error) { + var err error + if offset%b.shardSize != 0 { + // Offset should always be aligned to b.shardSize + logger.LogIf(context.Background(), errUnexpected) + return 0, errUnexpected + } + if b.rc == nil { + // For the first ReadAt() call we need to open the stream for reading. + b.currOffset = offset + streamOffset := (offset/b.shardSize)*int64(b.h.Size()) + offset + b.rc, err = b.disk.ReadFileStream(b.volume, b.filePath, streamOffset, b.tillOffset-streamOffset) + if err != nil { + logger.LogIf(context.Background(), err) + return 0, err + } + } + if offset != b.currOffset { + logger.LogIf(context.Background(), errUnexpected) + return 0, errUnexpected + } + b.h.Reset() + _, err = io.ReadFull(b.rc, b.hashBytes) + if err != nil { + logger.LogIf(context.Background(), err) + return 0, err + } + _, err = io.ReadFull(b.rc, buf) + if err != nil { + logger.LogIf(context.Background(), err) + return 0, err + } + b.h.Write(buf) + + if bytes.Compare(b.h.Sum(nil), b.hashBytes) != 0 { + err = hashMismatchError{hex.EncodeToString(b.hashBytes), hex.EncodeToString(b.h.Sum(nil))} + logger.LogIf(context.Background(), err) + return 0, err + } + b.currOffset += int64(len(buf)) + return len(buf), nil +} + +// Returns streaming bitrot reader implementation. +func newStreamingBitrotReader(disk StorageAPI, volume, filePath string, tillOffset int64, algo BitrotAlgorithm, shardSize int64) *streamingBitrotReader { + h := algo.New() + return &streamingBitrotReader{ + disk, + nil, + volume, + filePath, + ceilFrac(tillOffset, shardSize)*int64(h.Size()) + tillOffset, + 0, + h, + shardSize, + make([]byte, h.Size()), + } +} diff --git a/cmd/bitrot-whole.go b/cmd/bitrot-whole.go new file mode 100644 index 000000000..d18d99935 --- /dev/null +++ b/cmd/bitrot-whole.go @@ -0,0 +1,109 @@ +/* + * Minio Cloud Storage, (C) 2019 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "hash" + "io" + + "github.com/minio/minio/cmd/logger" +) + +// Implementation to calculate bitrot for the whole file. +type wholeBitrotWriter struct { + disk StorageAPI + volume string + filePath string + shardSize int64 // This is the shard size of the erasure logic + hash.Hash // For bitrot hash + + // Following two fields are used only to make sure that Write(p) is called such that + // len(p) is always the block size except the last block and prevent programmer errors. + currentBlockIdx int + lastBlockIdx int +} + +func (b *wholeBitrotWriter) Write(p []byte) (int, error) { + if b.currentBlockIdx < b.lastBlockIdx && int64(len(p)) != b.shardSize { + // All blocks except last should be of the length b.shardSize + logger.LogIf(context.Background(), errUnexpected) + return 0, errUnexpected + } + err := b.disk.AppendFile(b.volume, b.filePath, p) + if err != nil { + logger.LogIf(context.Background(), err) + return 0, err + } + _, err = b.Hash.Write(p) + if err != nil { + logger.LogIf(context.Background(), err) + return 0, err + } + b.currentBlockIdx++ + return len(p), nil +} + +func (b *wholeBitrotWriter) Close() error { + return nil +} + +// Returns whole-file bitrot writer. +func newWholeBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { + return &wholeBitrotWriter{disk, volume, filePath, shardSize, algo.New(), 0, int(length / shardSize)} +} + +// Implementation to verify bitrot for the whole file. +type wholeBitrotReader struct { + disk StorageAPI + volume string + filePath string + verifier *BitrotVerifier // Holds the bit-rot info + tillOffset int64 // Affects the length of data requested in disk.ReadFile depending on Read()'s offset + buf []byte // Holds bit-rot verified data +} + +func (b *wholeBitrotReader) ReadAt(buf []byte, offset int64) (n int, err error) { + if b.buf == nil { + b.buf = make([]byte, b.tillOffset-offset) + if _, err := b.disk.ReadFile(b.volume, b.filePath, offset, b.buf, b.verifier); err != nil { + ctx := context.Background() + logger.GetReqInfo(ctx).AppendTags("disk", b.disk.String()) + logger.LogIf(ctx, err) + return 0, err + } + } + if len(b.buf) < len(buf) { + logger.LogIf(context.Background(), errLessData) + return 0, errLessData + } + n = copy(buf, b.buf) + b.buf = b.buf[n:] + return n, nil +} + +// Returns whole-file bitrot reader. +func newWholeBitrotReader(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm, tillOffset int64, sum []byte) *wholeBitrotReader { + return &wholeBitrotReader{ + disk: disk, + volume: volume, + filePath: filePath, + verifier: &BitrotVerifier{algo, sum}, + tillOffset: tillOffset, + buf: nil, + } +} diff --git a/cmd/bitrot.go b/cmd/bitrot.go index 40e6839c5..b9fcd8f2c 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -20,6 +20,7 @@ import ( "context" "errors" "hash" + "io" "github.com/minio/highwayhash" "github.com/minio/minio/cmd/logger" @@ -38,19 +39,22 @@ const ( SHA256 BitrotAlgorithm = 1 + iota // HighwayHash256 represents the HighwayHash-256 hash function HighwayHash256 + // HighwayHash256 represents the Streaming HighwayHash-256 hash function + HighwayHash256S // BLAKE2b512 represents the BLAKE2b-512 hash function BLAKE2b512 ) // DefaultBitrotAlgorithm is the default algorithm used for bitrot protection. const ( - DefaultBitrotAlgorithm = HighwayHash256 + DefaultBitrotAlgorithm = HighwayHash256S ) var bitrotAlgorithms = map[BitrotAlgorithm]string{ - SHA256: "sha256", - BLAKE2b512: "blake2b", - HighwayHash256: "highwayhash256", + SHA256: "sha256", + BLAKE2b512: "blake2b", + HighwayHash256: "highwayhash256", + HighwayHash256S: "highwayhash256S", } // New returns a new hash.Hash calculating the given bitrot algorithm. @@ -64,6 +68,9 @@ func (a BitrotAlgorithm) New() hash.Hash { case HighwayHash256: hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit return hh + case HighwayHash256S: + hh, _ := highwayhash.New(magicHighwayHash256Key) // New will never return error since key is 256 bit + return hh default: logger.CriticalIf(context.Background(), errors.New("Unsupported bitrot algorithm")) return nil @@ -109,86 +116,71 @@ func BitrotAlgorithmFromString(s string) (a BitrotAlgorithm) { return } -// To read bit-rot verified data. -type bitrotReader struct { - disk StorageAPI - volume string - filePath string - verifier *BitrotVerifier // Holds the bit-rot info - endOffset int64 // Affects the length of data requested in disk.ReadFile depending on Read()'s offset - buf []byte // Holds bit-rot verified data +func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.Writer { + if algo == HighwayHash256S { + return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize) + } + return newWholeBitrotWriter(disk, volume, filePath, length, algo, shardSize) } -// newBitrotReader returns bitrotReader. -// Note that the buffer is allocated later in Read(). This is because we will know the buffer length only -// during the bitrotReader.Read(). Depending on when parallelReader fails-over, the buffer length can be different. -func newBitrotReader(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm, endOffset int64, sum []byte) *bitrotReader { - return &bitrotReader{ - disk: disk, - volume: volume, - filePath: filePath, - verifier: &BitrotVerifier{algo, sum}, - endOffset: endOffset, - buf: nil, +func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { + if algo == HighwayHash256S { + return newStreamingBitrotReader(disk, bucket, filePath, tillOffset, algo, shardSize) } + return newWholeBitrotReader(disk, bucket, filePath, algo, tillOffset, sum) } -// ReadChunk returns requested data. -func (b *bitrotReader) ReadChunk(offset int64, length int64) ([]byte, error) { - if b.buf == nil { - b.buf = make([]byte, b.endOffset-offset) - if _, err := b.disk.ReadFile(b.volume, b.filePath, offset, b.buf, b.verifier); err != nil { - ctx := context.Background() - logger.GetReqInfo(ctx).AppendTags("disk", b.disk.String()) - logger.LogIf(ctx, err) - return nil, err +// Close all the readers. +func closeBitrotReaders(rs []io.ReaderAt) { + for _, r := range rs { + if br, ok := r.(*streamingBitrotReader); ok { + br.Close() } } - if int64(len(b.buf)) < length { - logger.LogIf(context.Background(), errLessData) - return nil, errLessData - } - retBuf := b.buf[:length] - b.buf = b.buf[length:] - return retBuf, nil } -// To calculate the bit-rot of the written data. -type bitrotWriter struct { - disk StorageAPI - volume string - filePath string - h hash.Hash +// Close all the writers. +func closeBitrotWriters(ws []io.Writer) { + for _, w := range ws { + if bw, ok := w.(*streamingBitrotWriter); ok { + bw.Close() + } + } } -// newBitrotWriter returns bitrotWriter. -func newBitrotWriter(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm) *bitrotWriter { - return &bitrotWriter{ - disk: disk, - volume: volume, - filePath: filePath, - h: algo.New(), +// Returns hash sum for whole-bitrot, nil for streaming-bitrot. +func bitrotWriterSum(w io.Writer) []byte { + if bw, ok := w.(*wholeBitrotWriter); ok { + return bw.Sum(nil) } + return nil } -// Append appends the data and while calculating the hash. -func (b *bitrotWriter) Append(buf []byte) error { - n, err := b.h.Write(buf) - if err != nil { +// Verify if a file has bitrot error. +func bitrotCheckFile(disk StorageAPI, volume string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) (err error) { + buf := make([]byte, shardSize) + if algo != HighwayHash256S { + // For whole-file bitrot we don't need to read the entire file as the bitrot verify happens on the server side even if we read small buffer + _, err = disk.ReadFile(volume, filePath, 0, buf, NewBitrotVerifier(algo, sum)) return err } - if n != len(buf) { - logger.LogIf(context.Background(), errUnexpected) - return errUnexpected - } - if err = b.disk.AppendFile(b.volume, b.filePath, buf); err != nil { - logger.LogIf(context.Background(), err) - return err + r := newStreamingBitrotReader(disk, volume, filePath, tillOffset, algo, shardSize) + defer closeBitrotReaders([]io.ReaderAt{r}) + var offset int64 + for { + if offset == tillOffset { + break + } + var n int + tmpBuf := buf + if int64(len(tmpBuf)) > (tillOffset - offset) { + tmpBuf = tmpBuf[:(tillOffset - offset)] + } + n, err = r.ReadAt(tmpBuf, offset) + if err != nil { + return err + } + offset += int64(n) } return nil } - -// Sum returns bit-rot sum. -func (b *bitrotWriter) Sum() []byte { - return b.h.Sum(nil) -} diff --git a/cmd/bitrot_test.go b/cmd/bitrot_test.go index 1e66aa90e..94b32594c 100644 --- a/cmd/bitrot_test.go +++ b/cmd/bitrot_test.go @@ -17,6 +17,7 @@ package cmd import ( + "io" "io/ioutil" "log" "os" @@ -40,32 +41,38 @@ func TestBitrotReaderWriter(t *testing.T) { disk.MakeVol(volume) - writer := newBitrotWriter(disk, volume, filePath, HighwayHash256) + writer := newBitrotWriter(disk, volume, filePath, 35, HighwayHash256S, 10) - err = writer.Append([]byte("aaaaaaaaa")) + _, err = writer.Write([]byte("aaaaaaaaaa")) if err != nil { log.Fatal(err) } - err = writer.Append([]byte("a")) + _, err = writer.Write([]byte("aaaaaaaaaa")) if err != nil { log.Fatal(err) } - err = writer.Append([]byte("aaaaaaaaaa")) + _, err = writer.Write([]byte("aaaaaaaaaa")) if err != nil { log.Fatal(err) } - err = writer.Append([]byte("aaaaa")) + _, err = writer.Write([]byte("aaaaa")) if err != nil { log.Fatal(err) } - err = writer.Append([]byte("aaaaaaaaaa")) - if err != nil { + writer.(io.Closer).Close() + + reader := newStreamingBitrotReader(disk, volume, filePath, 35, HighwayHash256S, 10) + b := make([]byte, 10) + if _, err = reader.ReadAt(b, 0); err != nil { log.Fatal(err) } - - reader := newBitrotReader(disk, volume, filePath, HighwayHash256, 35, writer.Sum()) - - if _, err = reader.ReadChunk(0, 35); err != nil { + if _, err = reader.ReadAt(b, 10); err != nil { + log.Fatal(err) + } + if _, err = reader.ReadAt(b, 20); err != nil { + log.Fatal(err) + } + if _, err = reader.ReadAt(b[:5], 30); err != nil { log.Fatal(err) } } diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 93583f3ec..fd737d394 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -23,25 +23,25 @@ import ( "github.com/minio/minio/cmd/logger" ) -// Reads in parallel from bitrotReaders. +// Reads in parallel from readers. type parallelReader struct { - readers []*bitrotReader + readers []io.ReaderAt dataBlocks int offset int64 shardSize int64 shardFileSize int64 + buf [][]byte } // newParallelReader returns parallelReader. -func newParallelReader(readers []*bitrotReader, dataBlocks int, offset int64, fileSize int64, blocksize int64) *parallelReader { - shardSize := ceilFrac(blocksize, int64(dataBlocks)) - shardFileSize := getErasureShardFileSize(blocksize, fileSize, dataBlocks) +func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int64) *parallelReader { return ¶llelReader{ readers, - dataBlocks, - (offset / blocksize) * shardSize, - shardSize, - shardFileSize, + e.dataBlocks, + (offset / e.blockSize) * e.ShardSize(), + e.ShardSize(), + e.ShardFileSize(totalLength), + make([][]byte, len(readers)), } } @@ -56,7 +56,7 @@ func (p *parallelReader) canDecode(buf [][]byte) bool { return bufCount >= p.dataBlocks } -// Read reads from bitrotReaders in parallel. Returns p.dataBlocks number of bufs. +// Read reads from readers in parallel. Returns p.dataBlocks number of bufs. func (p *parallelReader) Read() ([][]byte, error) { type errIdx struct { idx int @@ -73,8 +73,12 @@ func (p *parallelReader) Read() ([][]byte, error) { } read := func(currReaderIndex int) { - b, err := p.readers[currReaderIndex].ReadChunk(p.offset, p.shardSize) - errCh <- errIdx{currReaderIndex, b, err} + if p.buf[currReaderIndex] == nil { + p.buf[currReaderIndex] = make([]byte, p.shardSize) + } + p.buf[currReaderIndex] = p.buf[currReaderIndex][:p.shardSize] + _, err := p.readers[currReaderIndex].ReadAt(p.buf[currReaderIndex], p.offset) + errCh <- errIdx{currReaderIndex, p.buf[currReaderIndex], err} } readerCount := 0 @@ -128,7 +132,7 @@ func (p *parallelReader) Read() ([][]byte, error) { } // Decode reads from readers, reconstructs data if needed and writes the data to the writer. -func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []*bitrotReader, offset, length, totalLength int64) error { +func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.ReaderAt, offset, length, totalLength int64) error { if offset < 0 || length < 0 { logger.LogIf(ctx, errInvalidArgument) return errInvalidArgument @@ -141,7 +145,7 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []*bitrot return nil } - reader := newParallelReader(readers, e.dataBlocks, offset, totalLength, e.blockSize) + reader := newParallelReader(readers, e, offset, totalLength) startBlock := offset / e.blockSize endBlock := (offset + length) / e.blockSize diff --git a/cmd/erasure-decode_test.go b/cmd/erasure-decode_test.go index e506b6a7e..58bffa0f5 100644 --- a/cmd/erasure-decode_test.go +++ b/cmd/erasure-decode_test.go @@ -41,26 +41,28 @@ var erasureDecodeTests = []struct { algorithm BitrotAlgorithm shouldFail, shouldFailQuorum bool }{ - {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 0 - {dataBlocks: 3, onDisks: 6, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: SHA256, shouldFail: false, shouldFailQuorum: false}, // 1 - {dataBlocks: 4, onDisks: 8, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 2 - {dataBlocks: 5, onDisks: 10, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 1, length: oneMiByte - 1, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 3 - {dataBlocks: 6, onDisks: 12, offDisks: 0, blocksize: int64(oneMiByte), data: oneMiByte, offset: oneMiByte, length: 0, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 4 - {dataBlocks: 7, onDisks: 14, offDisks: 0, blocksize: int64(oneMiByte), data: oneMiByte, offset: 3, length: 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 5 - {dataBlocks: 8, onDisks: 16, offDisks: 0, blocksize: int64(oneMiByte), data: oneMiByte, offset: 4, length: 8 * 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 6 - {dataBlocks: 7, onDisks: 14, offDisks: 7, blocksize: int64(blockSizeV1), data: oneMiByte, offset: oneMiByte, length: 1, algorithm: DefaultBitrotAlgorithm, shouldFail: true, shouldFailQuorum: false}, // 7 - {dataBlocks: 6, onDisks: 12, offDisks: 6, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 8 - {dataBlocks: 5, onDisks: 10, offDisks: 5, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 9 - {dataBlocks: 4, onDisks: 8, offDisks: 4, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: SHA256, shouldFail: false, shouldFailQuorum: false}, // 10 - {dataBlocks: 3, onDisks: 6, offDisks: 3, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 11 - {dataBlocks: 2, onDisks: 4, offDisks: 2, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 12 - {dataBlocks: 2, onDisks: 4, offDisks: 1, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 13 - {dataBlocks: 3, onDisks: 6, offDisks: 2, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 14 - {dataBlocks: 4, onDisks: 8, offDisks: 3, blocksize: int64(2 * oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 15 - {dataBlocks: 5, onDisks: 10, offDisks: 6, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 16 - {dataBlocks: 5, onDisks: 10, offDisks: 2, blocksize: int64(blockSizeV1), data: 2 * oneMiByte, offset: oneMiByte, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 17 - {dataBlocks: 5, onDisks: 10, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 18 - {dataBlocks: 6, onDisks: 12, offDisks: 3, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: SHA256, shouldFail: false, shouldFailQuorum: false}, // 19 + {dataBlocks: 2, onDisks: 4, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 0 + {dataBlocks: 3, onDisks: 6, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: SHA256, shouldFail: false, shouldFailQuorum: false}, // 1 + {dataBlocks: 4, onDisks: 8, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 2 + {dataBlocks: 5, onDisks: 10, offDisks: 0, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 1, length: oneMiByte - 1, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 3 + {dataBlocks: 6, onDisks: 12, offDisks: 0, blocksize: int64(oneMiByte), data: oneMiByte, offset: oneMiByte, length: 0, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, + // 4 + {dataBlocks: 7, onDisks: 14, offDisks: 0, blocksize: int64(oneMiByte), data: oneMiByte, offset: 3, length: 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 5 + {dataBlocks: 8, onDisks: 16, offDisks: 0, blocksize: int64(oneMiByte), data: oneMiByte, offset: 4, length: 8 * 1024, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 6 + {dataBlocks: 7, onDisks: 14, offDisks: 7, blocksize: int64(blockSizeV1), data: oneMiByte, offset: oneMiByte, length: 1, algorithm: DefaultBitrotAlgorithm, shouldFail: true, shouldFailQuorum: false}, // 7 + {dataBlocks: 6, onDisks: 12, offDisks: 6, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 8 + {dataBlocks: 5, onDisks: 10, offDisks: 5, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 9 + {dataBlocks: 4, onDisks: 8, offDisks: 4, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: SHA256, shouldFail: false, shouldFailQuorum: false}, // 10 + {dataBlocks: 3, onDisks: 6, offDisks: 3, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 11 + {dataBlocks: 2, onDisks: 4, offDisks: 2, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 12 + {dataBlocks: 2, onDisks: 4, offDisks: 1, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 13 + {dataBlocks: 3, onDisks: 6, offDisks: 2, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 14 + {dataBlocks: 4, onDisks: 8, offDisks: 3, blocksize: int64(2 * oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 15 + {dataBlocks: 5, onDisks: 10, offDisks: 6, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 16 + {dataBlocks: 5, onDisks: 10, offDisks: 2, blocksize: int64(blockSizeV1), data: 2 * oneMiByte, offset: oneMiByte, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 17 + {dataBlocks: 5, onDisks: 10, offDisks: 1, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: BLAKE2b512, shouldFail: false, shouldFailQuorum: false}, // 18 + {dataBlocks: 6, onDisks: 12, offDisks: 3, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: SHA256, shouldFail: false, shouldFailQuorum: false}, + // 19 {dataBlocks: 6, onDisks: 12, offDisks: 7, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 20 {dataBlocks: 8, onDisks: 16, offDisks: 8, blocksize: int64(blockSizeV1), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: false}, // 21 {dataBlocks: 8, onDisks: 16, offDisks: 9, blocksize: int64(oneMiByte), data: oneMiByte, offset: 0, length: oneMiByte, algorithm: DefaultBitrotAlgorithm, shouldFail: false, shouldFailQuorum: true}, // 22 @@ -104,11 +106,12 @@ func TestErasureDecode(t *testing.T) { writeAlgorithm = DefaultBitrotAlgorithm } buffer := make([]byte, test.blocksize, 2*test.blocksize) - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { - writers[i] = newBitrotWriter(disk, "testbucket", "object", writeAlgorithm) + writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(test.data), writeAlgorithm, erasure.ShardSize()) } n, err := erasure.Encode(context.Background(), bytes.NewReader(data[:]), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create erasure test file: %v", i, err) @@ -124,17 +127,19 @@ func TestErasureDecode(t *testing.T) { } // Get the checksums of the current part. - bitrotReaders := make([]*bitrotReader, len(disks)) + bitrotReaders := make([]io.ReaderAt, len(disks)) for index, disk := range disks { if disk == OfflineDisk { continue } - endOffset := getErasureShardFileEndOffset(test.offset, test.length, test.data, test.blocksize, erasure.dataBlocks) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", writeAlgorithm, endOffset, writers[index].Sum()) + tillOffset := erasure.ShardFileTillOffset(test.offset, test.length, test.data) + + bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } writer := bytes.NewBuffer(nil) err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data) + closeBitrotReaders(bitrotReaders) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -143,31 +148,41 @@ func TestErasureDecode(t *testing.T) { } if err == nil { if content := writer.Bytes(); !bytes.Equal(content, data[test.offset:test.offset+test.length]) { - t.Errorf("Test %d: read retruns wrong file content", i) + t.Errorf("Test %d: read retruns wrong file content.", i) } } + for i, r := range bitrotReaders { if r == nil { disks[i] = OfflineDisk } } if err == nil && !test.shouldFail { - bitrotReaders = make([]*bitrotReader, len(disks)) + bitrotReaders = make([]io.ReaderAt, len(disks)) for index, disk := range disks { if disk == OfflineDisk { continue } - endOffset := getErasureShardFileEndOffset(test.offset, test.length, test.data, test.blocksize, erasure.dataBlocks) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", writeAlgorithm, endOffset, writers[index].Sum()) + tillOffset := erasure.ShardFileTillOffset(test.offset, test.length, test.data) + bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", tillOffset, writeAlgorithm, bitrotWriterSum(writers[index]), erasure.ShardSize()) } for j := range disks[:test.offDisks] { - bitrotReaders[j].disk = badDisk{nil} + if bitrotReaders[j] == nil { + continue + } + switch r := bitrotReaders[j].(type) { + case *wholeBitrotReader: + r.disk = badDisk{nil} + case *streamingBitrotReader: + r.disk = badDisk{nil} + } } if test.offDisks > 0 { bitrotReaders[0] = nil } writer.Reset() err = erasure.Decode(context.Background(), writer, bitrotReaders, test.offset, test.length, test.data) + closeBitrotReaders(bitrotReaders) if err != nil && !test.shouldFailQuorum { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -213,12 +228,12 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { t.Fatal(err) } - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", DefaultBitrotAlgorithm) + writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(length), DefaultBitrotAlgorithm, erasure.ShardSize()) } // 10000 iterations with random offsets and lengths. @@ -227,6 +242,7 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { // Create a test file to read from. buffer := make([]byte, blockSize, 2*blockSize) n, err := erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil { t.Fatal(err) } @@ -247,15 +263,16 @@ func TestErasureDecodeRandomOffsetLength(t *testing.T) { expected := data[offset : offset+readLen] // Get the checksums of the current part. - bitrotReaders := make([]*bitrotReader, len(disks)) + bitrotReaders := make([]io.ReaderAt, len(disks)) for index, disk := range disks { if disk == OfflineDisk { continue } - endOffset := getErasureShardFileEndOffset(offset, readLen, length, blockSize, erasure.dataBlocks) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", DefaultBitrotAlgorithm, endOffset, writers[index].Sum()) + tillOffset := erasure.ShardFileTillOffset(offset, readLen, length) + bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } err = erasure.Decode(context.Background(), buf, bitrotReaders, offset, readLen, length) + closeBitrotReaders(bitrotReaders) if err != nil { t.Fatal(err, offset, readLen) } @@ -281,17 +298,18 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, b.Fatalf("failed to create ErasureStorage: %v", err) } - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", DefaultBitrotAlgorithm) + writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) } content := make([]byte, size) buffer := make([]byte, blockSizeV1, 2*blockSizeV1) _, err = erasure.Encode(context.Background(), bytes.NewReader(content), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil { b.Fatalf("failed to create erasure test file: %v", err) } @@ -307,17 +325,18 @@ func benchmarkErasureDecode(data, parity, dataDown, parityDown int, size int64, b.SetBytes(size) b.ReportAllocs() for i := 0; i < b.N; i++ { - bitrotReaders := make([]*bitrotReader, len(disks)) + bitrotReaders := make([]io.ReaderAt, len(disks)) for index, disk := range disks { if writers[index] == nil { continue } - endOffset := getErasureShardFileEndOffset(0, size, size, erasure.blockSize, erasure.dataBlocks) - bitrotReaders[index] = newBitrotReader(disk, "testbucket", "object", DefaultBitrotAlgorithm, endOffset, writers[index].Sum()) + tillOffset := erasure.ShardFileTillOffset(0, size, size) + bitrotReaders[index] = newStreamingBitrotReader(disk, "testbucket", "object", tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } if err = erasure.Decode(context.Background(), bytes.NewBuffer(content[:0]), bitrotReaders, 0, size, size); err != nil { panic(err) } + closeBitrotReaders(bitrotReaders) } } diff --git a/cmd/erasure-encode.go b/cmd/erasure-encode.go index c3ac0e951..813ab6b70 100644 --- a/cmd/erasure-encode.go +++ b/cmd/erasure-encode.go @@ -25,15 +25,15 @@ import ( "github.com/minio/minio/cmd/logger" ) -// Writes in parallel to bitrotWriters +// Writes in parallel to writers type parallelWriter struct { - writers []*bitrotWriter + writers []io.Writer writeQuorum int errs []error } -// Append appends data to bitrotWriters in parallel. -func (p *parallelWriter) Append(ctx context.Context, blocks [][]byte) error { +// Write writes data to writers in parallel. +func (p *parallelWriter) Write(ctx context.Context, blocks [][]byte) error { var wg sync.WaitGroup for i := range p.writers { @@ -45,7 +45,7 @@ func (p *parallelWriter) Append(ctx context.Context, blocks [][]byte) error { wg.Add(1) go func(i int) { defer wg.Done() - p.errs[i] = p.writers[i].Append(blocks[i]) + _, p.errs[i] = p.writers[i].Write(blocks[i]) if p.errs[i] != nil { p.writers[i] = nil } @@ -70,7 +70,7 @@ func (p *parallelWriter) Append(ctx context.Context, blocks [][]byte) error { } // Encode reads from the reader, erasure-encodes the data and writes to the writers. -func (e *Erasure) Encode(ctx context.Context, src io.Reader, writers []*bitrotWriter, buf []byte, quorum int) (total int64, err error) { +func (e *Erasure) Encode(ctx context.Context, src io.Reader, writers []io.Writer, buf []byte, quorum int) (total int64, err error) { writer := ¶llelWriter{ writers: writers, writeQuorum: quorum, @@ -96,7 +96,7 @@ func (e *Erasure) Encode(ctx context.Context, src io.Reader, writers []*bitrotWr return 0, err } - if err = writer.Append(ctx, blocks); err != nil { + if err = writer.Write(ctx, blocks); err != nil { logger.LogIf(ctx, err) return 0, err } diff --git a/cmd/erasure-encode_test.go b/cmd/erasure-encode_test.go index d39be90d0..03107c19c 100644 --- a/cmd/erasure-encode_test.go +++ b/cmd/erasure-encode_test.go @@ -36,6 +36,14 @@ func (a badDisk) AppendFile(volume string, path string, buf []byte) error { return errFaultyDisk } +func (a badDisk) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) { + return nil, errFaultyDisk +} + +func (a badDisk) CreateFile(volume, path string, size int64, reader io.Reader) error { + return errFaultyDisk +} + const oneMiByte = 1 * humanize.MiByte var erasureEncodeTests = []struct { @@ -87,14 +95,15 @@ func TestErasureEncode(t *testing.T) { setup.Remove() t.Fatalf("Test %d: failed to generate random test data: %v", i, err) } - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { if disk == OfflineDisk { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", test.algorithm) + writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) } n, err := erasure.Encode(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -110,20 +119,26 @@ func TestErasureEncode(t *testing.T) { if length := int64(len(data[test.offset:])); n != length { t.Errorf("Test %d: invalid number of bytes written: got: #%d want #%d", i, n, length) } - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object2", test.algorithm) + writers[i] = newBitrotWriter(disk, "testbucket", "object2", erasure.ShardFileSize(int64(len(data[test.offset:]))), test.algorithm, erasure.ShardSize()) } for j := range disks[:test.offDisks] { - writers[j].disk = badDisk{nil} + switch w := writers[j].(type) { + case *wholeBitrotWriter: + w.disk = badDisk{nil} + case *streamingBitrotWriter: + w.iow.CloseWithError(errFaultyDisk) + } } if test.offDisks > 0 { writers[0] = nil } n, err = erasure.Encode(context.Background(), bytes.NewReader(data[test.offset:]), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil && !test.shouldFailQuorum { t.Errorf("Test %d: should pass but failed with: %v", i, err) } @@ -167,14 +182,16 @@ func benchmarkErasureEncode(data, parity, dataDown, parityDown int, size int64, b.SetBytes(size) b.ReportAllocs() for i := 0; i < b.N; i++ { - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { if disk == OfflineDisk { continue } - writers[i] = newBitrotWriter(disk, "testbucket", "object", DefaultBitrotAlgorithm) + disk.DeleteFile("testbucket", "object") + writers[i] = newBitrotWriter(disk, "testbucket", "object", erasure.ShardFileSize(size), DefaultBitrotAlgorithm, erasure.ShardSize()) } _, err := erasure.Encode(context.Background(), bytes.NewReader(content), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil { panic(err) } diff --git a/cmd/erasure-heal.go b/cmd/erasure-heal.go index 344ce0f02..c42d380da 100644 --- a/cmd/erasure-heal.go +++ b/cmd/erasure-heal.go @@ -25,7 +25,7 @@ import ( // Heal heals the shard files on non-nil writers. Note that the quorum passed is 1 // as healing should continue even if it has been successful healing only one shard file. -func (e Erasure) Heal(ctx context.Context, readers []*bitrotReader, writers []*bitrotWriter, size int64) error { +func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64) error { r, w := io.Pipe() go func() { if err := e.Decode(ctx, w, readers, 0, size, size); err != nil { diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index 5a52d6a6d..b954e1fa2 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -21,6 +21,7 @@ import ( "context" "crypto/rand" "io" + "os" "testing" ) @@ -84,20 +85,21 @@ func TestErasureHeal(t *testing.T) { t.Fatalf("Test %d: failed to create random test data: %v", i, err) } buffer := make([]byte, test.blocksize, 2*test.blocksize) - writers := make([]*bitrotWriter, len(disks)) + writers := make([]io.Writer, len(disks)) for i, disk := range disks { - writers[i] = newBitrotWriter(disk, "testbucket", "testobject", test.algorithm) + writers[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) } _, err = erasure.Encode(context.Background(), bytes.NewReader(data), writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil { setup.Remove() t.Fatalf("Test %d: failed to create random test data: %v", i, err) } - readers := make([]*bitrotReader, len(disks)) + readers := make([]io.ReaderAt, len(disks)) for i, disk := range disks { - shardFilesize := getErasureShardFileSize(test.blocksize, test.size, erasure.dataBlocks) - readers[i] = newBitrotReader(disk, "testbucket", "testobject", test.algorithm, shardFilesize, writers[i].Sum()) + shardFilesize := erasure.ShardFileSize(test.size) + readers[i] = newBitrotReader(disk, "testbucket", "testobject", shardFilesize, test.algorithm, bitrotWriterSum(writers[i]), erasure.ShardSize()) } // setup stale disks for the test case @@ -111,22 +113,30 @@ func TestErasureHeal(t *testing.T) { } } for j := 0; j < test.badDisks; j++ { - readers[test.offDisks+j].disk = badDisk{nil} + switch r := readers[test.offDisks+j].(type) { + case *streamingBitrotReader: + r.disk = badDisk{nil} + case *wholeBitrotReader: + r.disk = badDisk{nil} + } } for j := 0; j < test.badStaleDisks; j++ { staleDisks[j] = badDisk{nil} } - staleWriters := make([]*bitrotWriter, len(staleDisks)) + staleWriters := make([]io.Writer, len(staleDisks)) for i, disk := range staleDisks { if disk == nil { continue } - staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", test.algorithm) + os.Remove(pathJoin(disk.String(), "testbucket", "testobject")) + staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) } - // test case setup is complete - now call Healfile() + // test case setup is complete - now call Heal() err = erasure.Heal(context.Background(), readers, staleWriters, test.size) + closeBitrotReaders(readers) + closeBitrotWriters(staleWriters) if err != nil && !test.shouldFail { t.Errorf("Test %d: should pass but it failed with: %v", i, err) } @@ -140,7 +150,7 @@ func TestErasureHeal(t *testing.T) { if staleWriters[i] == nil { continue } - if !bytes.Equal(staleWriters[i].Sum(), writers[i].Sum()) { + if !bytes.Equal(bitrotWriterSum(staleWriters[i]), bitrotWriterSum(writers[i])) { t.Errorf("Test %d: heal returned different bitrot checksums", i) } } diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 524e58f40..283f3c412 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -109,25 +109,3 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data // Success. return totalWritten, nil } - -// Returns shard-file size. -func getErasureShardFileSize(blockSize int64, totalLength int64, dataBlocks int) int64 { - shardSize := ceilFrac(int64(blockSize), int64(dataBlocks)) - numShards := totalLength / int64(blockSize) - lastBlockSize := totalLength % int64(blockSize) - lastShardSize := ceilFrac(lastBlockSize, int64(dataBlocks)) - return shardSize*numShards + lastShardSize -} - -// Returns the endOffset till which bitrotReader should read data using disk.ReadFile() -// partOffset, partLength and partSize are values of the object's part file. -func getErasureShardFileEndOffset(partOffset int64, partLength int64, partSize int64, erasureBlockSize int64, dataBlocks int) int64 { - shardSize := ceilFrac(erasureBlockSize, int64(dataBlocks)) - shardFileSize := getErasureShardFileSize(erasureBlockSize, partSize, dataBlocks) - endShard := (partOffset + int64(partLength)) / erasureBlockSize - endOffset := endShard*shardSize + shardSize - if endOffset > shardFileSize { - endOffset = shardFileSize - } - return endOffset -} diff --git a/cmd/erasure.go b/cmd/erasure.go index 229b8b310..cac06c1f9 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -32,18 +32,16 @@ type Erasure struct { // NewErasure creates a new ErasureStorage. func NewErasure(ctx context.Context, dataBlocks, parityBlocks int, blockSize int64) (e Erasure, err error) { - shardsize := int(ceilFrac(blockSize, int64(dataBlocks))) - erasure, err := reedsolomon.New(dataBlocks, parityBlocks, reedsolomon.WithAutoGoroutines(shardsize)) - if err != nil { - logger.LogIf(ctx, err) - return e, err - } e = Erasure{ - encoder: erasure, dataBlocks: dataBlocks, parityBlocks: parityBlocks, blockSize: blockSize, } + e.encoder, err = reedsolomon.New(dataBlocks, parityBlocks, reedsolomon.WithAutoGoroutines(int(e.ShardSize()))) + if err != nil { + logger.LogIf(ctx, err) + return e, err + } return } @@ -94,3 +92,28 @@ func (e *Erasure) DecodeDataAndParityBlocks(ctx context.Context, data [][]byte) } return nil } + +func (e *Erasure) ShardSize() int64 { + return ceilFrac(e.blockSize, int64(e.dataBlocks)) +} + +func (e *Erasure) ShardFileSize(totalLength int64) int64 { + if totalLength == 0 { + return 0 + } + numShards := totalLength / e.blockSize + lastBlockSize := totalLength % int64(e.blockSize) + lastShardSize := ceilFrac(lastBlockSize, int64(e.dataBlocks)) + return numShards*e.ShardSize() + lastShardSize +} + +func (e *Erasure) ShardFileTillOffset(startOffset, length, totalLength int64) int64 { + shardSize := e.ShardSize() + shardFileSize := e.ShardFileSize(totalLength) + endShard := (startOffset + int64(length)) / e.blockSize + tillOffset := endShard*shardSize + shardSize + if tillOffset > shardFileSize { + tillOffset = shardFileSize + } + return tillOffset +} diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index b53ca8604..c30f1fd83 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -17,6 +17,7 @@ package cmd import ( + "io" "sync" ) @@ -124,11 +125,18 @@ func (d *naughtyDisk) ReadFile(volume string, path string, offset int64, buf []b return d.disk.ReadFile(volume, path, offset, buf, verifier) } -func (d *naughtyDisk) PrepareFile(volume, path string, length int64) error { +func (d *naughtyDisk) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) { + if err := d.calcError(); err != nil { + return nil, err + } + return d.disk.ReadFileStream(volume, path, offset, length) +} + +func (d *naughtyDisk) CreateFile(volume, path string, size int64, reader io.Reader) error { if err := d.calcError(); err != nil { return err } - return d.disk.PrepareFile(volume, path, length) + return d.disk.CreateFile(volume, path, size, reader) } func (d *naughtyDisk) AppendFile(volume, path string, buf []byte) error { diff --git a/cmd/posix.go b/cmd/posix.go index 07553504f..5b0a16526 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -928,11 +928,99 @@ func (s *posix) openFile(volume, path string, mode int) (f *os.File, err error) return w, nil } -// PrepareFile - run prior actions before creating a new file for optimization purposes -// Currently we use fallocate when available to avoid disk fragmentation as much as possible -func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { - // It doesn't make sense to create a negative-sized file - if fileSize < -1 { +// Just like io.LimitedReader but supports Close() to be compatible with io.ReadCloser that is +// returned by posix.ReadFileStream() +type posixLimitedReader struct { + io.LimitedReader +} + +func (l *posixLimitedReader) Close() error { + c, ok := l.R.(io.Closer) + if !ok { + return errUnexpected + } + return c.Close() +} + +// ReadFileStream - Returns the read stream of the file. +func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) { + var err error + defer func() { + if err == errFaultyDisk { + atomic.AddInt32(&s.ioErrCount, 1) + } + }() + + if offset < 0 { + return nil, errInvalidArgument + } + + if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError { + return nil, errFaultyDisk + } + + if err = s.checkDiskFound(); err != nil { + return nil, err + } + + volumeDir, err := s.getVolDir(volume) + if err != nil { + return nil, err + } + // Stat a volume entry. + _, err = os.Stat((volumeDir)) + if err != nil { + if os.IsNotExist(err) { + return nil, errVolumeNotFound + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } + return nil, err + } + + // Validate effective path length before reading. + filePath := pathJoin(volumeDir, path) + if err = checkPathLength((filePath)); err != nil { + return nil, err + } + + // Open the file for reading. + file, err := os.Open((filePath)) + if err != nil { + switch { + case os.IsNotExist(err): + return nil, errFileNotFound + case os.IsPermission(err): + return nil, errFileAccessDenied + case isSysErrNotDir(err): + return nil, errFileAccessDenied + case isSysErrIO(err): + return nil, errFaultyDisk + default: + return nil, err + } + } + + st, err := file.Stat() + if err != nil { + return nil, err + } + + // Verify it is a regular file, otherwise subsequent Seek is + // undefined. + if !st.Mode().IsRegular() { + return nil, errIsNotRegular + } + + if _, err = file.Seek(offset, io.SeekStart); err != nil { + return nil, err + } + return &posixLimitedReader{io.LimitedReader{file, length}}, nil +} + +// CreateFile - creates the file. +func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) { + if fileSize < 0 { return errInvalidArgument } @@ -954,13 +1042,13 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { return err } - // Create file if not found - w, err := s.openFile(volume, path, os.O_CREATE|os.O_APPEND|os.O_WRONLY) + // Create file if not found. Note that it is created with os.O_EXCL flag as the file + // always is supposed to be created in the tmp directory with a unique file name. + w, err := s.openFile(volume, path, os.O_CREATE|os.O_APPEND|os.O_WRONLY|os.O_EXCL) if err != nil { return err } - // Close upon return. defer w.Close() var e error @@ -983,6 +1071,20 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { } return err } + + bufp := s.pool.Get().(*[]byte) + defer s.pool.Put(bufp) + + n, err := io.CopyBuffer(w, r, *bufp) + if err != nil { + return err + } + if n < fileSize { + return errLessData + } + if n > fileSize { + return errMoreData + } return nil } diff --git a/cmd/posix_test.go b/cmd/posix_test.go index c67c33da6..cf58a3e69 100644 --- a/cmd/posix_test.go +++ b/cmd/posix_test.go @@ -1438,113 +1438,6 @@ func TestPosixAppendFile(t *testing.T) { } } -// TestPosix posix.PrepareFile() -func TestPosixPrepareFile(t *testing.T) { - // create posix test setup - posixStorage, path, err := newPosixTestSetup() - if err != nil { - t.Fatalf("Unable to create posix test setup, %s", err) - } - defer os.RemoveAll(path) - - // Setup test environment. - if err = posixStorage.MakeVol("success-vol"); err != nil { - t.Fatalf("Unable to create volume, %s", err) - } - - if err = os.Mkdir(slashpath.Join(path, "success-vol", "object-as-dir"), 0777); err != nil { - t.Fatalf("Unable to create directory, %s", err) - } - - testCases := []struct { - fileName string - expectedErr error - }{ - {"myobject", nil}, - {"path/to/my/object", nil}, - // TestPosix to append to previously created file. - {"myobject", nil}, - // TestPosix to use same path of previously created file. - {"path/to/my/testobject", nil}, - {"object-as-dir", errIsNotRegular}, - // path segment uses previously uploaded object. - {"myobject/testobject", errFileAccessDenied}, - // One path segment length is > 255 chars long. - {"path/to/my/object0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001", errFileNameTooLong}, - } - - // Add path length > 1024 test specially as OS X system does not support 1024 long path. - err = errFileNameTooLong - if runtime.GOOS != "darwin" { - err = nil - } - // path length is 1024 chars long. - testCases = append(testCases, struct { - fileName string - expectedErr error - }{"level0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001/level0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002/level0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003/object000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001", err}) - - for i, testCase := range testCases { - if err = posixStorage.PrepareFile("success-vol", testCase.fileName, 16); err != testCase.expectedErr { - t.Errorf("Case: %d, expected: %s, got: %s", i, testCase.expectedErr, err) - } - } - - // TestPosix for permission denied. - if runtime.GOOS != globalWindowsOSName { - permDeniedDir := createPermDeniedFile(t) - defer removePermDeniedFile(permDeniedDir) - - var posixPermStorage StorageAPI - // Initialize posix storage layer for permission denied error. - _, err = newPosix(permDeniedDir) - if err != nil && !os.IsPermission(err) { - t.Fatalf("Unable to initialize posix, %s", err) - } - - if err = os.Chmod(permDeniedDir, 0755); err != nil { - t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err) - } - - posixPermStorage, err = newPosix(permDeniedDir) - if err != nil { - t.Fatalf("Unable to initialize posix, %s", err) - } - - if err = posixPermStorage.PrepareFile("mybucket", "myobject", 16); err != errFileAccessDenied { - t.Fatalf("expected: Permission error, got: %s", err) - } - } - - // TestPosix case with invalid volume name. - // A valid volume name should be atleast of size 3. - err = posixStorage.PrepareFile("bn", "yes", 16) - if err != errVolumeNotFound { - t.Fatalf("expected: \"Invalid argument error\", got: \"%s\"", err) - } - - // TestPosix case with invalid file size which should be strictly positive - err = posixStorage.PrepareFile("success-vol", "yes", -3) - if err != errInvalidArgument { - t.Fatalf("should fail: %v", err) - } - - // TestPosix case with IO error count > max limit. - - // setting ioErrCnt to 6. - // should fail with errFaultyDisk. - if posixType, ok := posixStorage.(*posix); ok { - // setting the io error count from as specified in the test case. - posixType.ioErrCount = int32(6) - err = posixType.PrepareFile("abc", "yes", 16) - if err != errFaultyDisk { - t.Fatalf("Expected \"Faulty Disk\", got: \"%s\"", err) - } - } else { - t.Fatalf("Expected the StorageAPI to be of type *posix") - } -} - // TestPosix posix.RenameFile() func TestPosixRenameFile(t *testing.T) { // create posix test setup diff --git a/cmd/rest/client.go b/cmd/rest/client.go index e201233b3..7770273b9 100644 --- a/cmd/rest/client.go +++ b/cmd/rest/client.go @@ -42,7 +42,7 @@ type Client struct { } // Call - make a REST call. -func (c *Client) Call(method string, values url.Values, body io.Reader) (reply io.ReadCloser, err error) { +func (c *Client) Call(method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) { req, err := http.NewRequest(http.MethodPost, c.url.String()+"/"+method+"?"+values.Encode(), body) if err != nil { return nil, err @@ -50,7 +50,9 @@ func (c *Client) Call(method string, values url.Values, body io.Reader) (reply i req.Header.Set("Authorization", "Bearer "+c.newAuthToken()) req.Header.Set("X-Minio-Time", time.Now().UTC().Format(time.RFC3339)) - + if length > 0 { + req.ContentLength = length + } resp, err := c.httpClient.Do(req) if err != nil { return nil, err diff --git a/cmd/storage-errors.go b/cmd/storage-errors.go index 1e4573c8f..7327416ae 100644 --- a/cmd/storage-errors.go +++ b/cmd/storage-errors.go @@ -82,6 +82,9 @@ var errMinDiskSize = errors.New("The disk size is less than the minimum threshol // errLessData - returned when less data available than what was requested. var errLessData = errors.New("less data available than what was requested") +// errMoreData = returned when more data was sent by the caller than what it was supposed to. +var errMoreData = errors.New("more data was sent than what was advertised") + // hashMisMatchError - represents a bit-rot hash verification failure // error. type hashMismatchError struct { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 55f1f4ddf..b3ec30680 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -41,8 +41,9 @@ type StorageAPI interface { // File operations. ListDir(volume, dirPath string, count int) ([]string, error) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) - PrepareFile(volume string, path string, len int64) (err error) AppendFile(volume string, path string, buf []byte) (err error) + CreateFile(volume, path string, size int64, reader io.Reader) error + ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error StatFile(volume string, path string) (file FileInfo, err error) DeleteFile(volume string, path string) (err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 15119bba5..221c9c8c2 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -115,11 +115,11 @@ type storageRESTClient struct { // Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected // permanently. The only way to restore the storage connection is at the xl-sets layer by xlsets.monitorAndConnectEndpoints() // after verifying format.json -func (client *storageRESTClient) call(method string, values url.Values, body io.Reader) (respBody io.ReadCloser, err error) { +func (client *storageRESTClient) call(method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) { if !client.connected { return nil, errDiskNotFound } - respBody, err = client.restClient.Call(method, values, body) + respBody, err = client.restClient.Call(method, values, body, length) if err == nil { return respBody, nil } @@ -148,7 +148,7 @@ func (client *storageRESTClient) LastError() error { // DiskInfo - fetch disk information for a remote disk. func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) { - respBody, err := client.call(storageRESTMethodDiskInfo, nil, nil) + respBody, err := client.call(storageRESTMethodDiskInfo, nil, nil, -1) if err != nil { return } @@ -161,14 +161,14 @@ func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) { func (client *storageRESTClient) MakeVol(volume string) (err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) - respBody, err := client.call(storageRESTMethodMakeVol, values, nil) + respBody, err := client.call(storageRESTMethodMakeVol, values, nil, -1) defer CloseResponse(respBody) return err } // ListVols - List all volumes on a remote disk. func (client *storageRESTClient) ListVols() (volinfo []VolInfo, err error) { - respBody, err := client.call(storageRESTMethodListVols, nil, nil) + respBody, err := client.call(storageRESTMethodListVols, nil, nil, -1) if err != nil { return } @@ -181,7 +181,7 @@ func (client *storageRESTClient) ListVols() (volinfo []VolInfo, err error) { func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) - respBody, err := client.call(storageRESTMethodStatVol, values, nil) + respBody, err := client.call(storageRESTMethodStatVol, values, nil, -1) if err != nil { return } @@ -194,29 +194,28 @@ func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err er func (client *storageRESTClient) DeleteVol(volume string) (err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) - respBody, err := client.call(storageRESTMethodDeleteVol, values, nil) + respBody, err := client.call(storageRESTMethodDeleteVol, values, nil, -1) defer CloseResponse(respBody) return err } -// PrepareFile - to fallocate() disk space for a file. -func (client *storageRESTClient) PrepareFile(volume, path string, length int64) error { +// AppendFile - append to a file. +func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - values.Set(storageRESTLength, strconv.Itoa(int(length))) - respBody, err := client.call(storageRESTMethodPrepareFile, values, nil) + reader := bytes.NewBuffer(buffer) + respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1) defer CloseResponse(respBody) return err } -// AppendFile - append to a file. -func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte) error { +func (client *storageRESTClient) CreateFile(volume, path string, length int64, r io.Reader) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - reader := bytes.NewBuffer(buffer) - respBody, err := client.call(storageRESTMethodAppendFile, values, reader) + values.Set(storageRESTLength, strconv.Itoa(int(length))) + respBody, err := client.call(storageRESTMethodCreateFile, values, r, length) defer CloseResponse(respBody) return err } @@ -227,7 +226,7 @@ func (client *storageRESTClient) WriteAll(volume, path string, buffer []byte) er values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) reader := bytes.NewBuffer(buffer) - respBody, err := client.call(storageRESTMethodWriteAll, values, reader) + respBody, err := client.call(storageRESTMethodWriteAll, values, reader, -1) defer CloseResponse(respBody) return err } @@ -237,7 +236,7 @@ func (client *storageRESTClient) StatFile(volume, path string) (info FileInfo, e values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - respBody, err := client.call(storageRESTMethodStatFile, values, nil) + respBody, err := client.call(storageRESTMethodStatFile, values, nil, -1) if err != nil { return info, err } @@ -251,7 +250,7 @@ func (client *storageRESTClient) ReadAll(volume, path string) ([]byte, error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - respBody, err := client.call(storageRESTMethodReadAll, values, nil) + respBody, err := client.call(storageRESTMethodReadAll, values, nil, -1) if err != nil { return nil, err } @@ -259,6 +258,20 @@ func (client *storageRESTClient) ReadAll(volume, path string) ([]byte, error) { return ioutil.ReadAll(respBody) } +// ReadFileStream - returns a reader for the requested file. +func (client *storageRESTClient) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + values.Set(storageRESTOffset, strconv.Itoa(int(offset))) + values.Set(storageRESTLength, strconv.Itoa(int(length))) + respBody, err := client.call(storageRESTMethodReadFileStream, values, nil, -1) + if err != nil { + return nil, err + } + return respBody, nil +} + // ReadFile - reads section of a file. func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buffer []byte, verifier *BitrotVerifier) (int64, error) { values := make(url.Values) @@ -273,7 +286,7 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf values.Set(storageRESTBitrotAlgo, "") values.Set(storageRESTBitrotHash, "") } - respBody, err := client.call(storageRESTMethodReadFile, values, nil) + respBody, err := client.call(storageRESTMethodReadFile, values, nil, -1) if err != nil { return 0, err } @@ -288,7 +301,7 @@ func (client *storageRESTClient) ListDir(volume, dirPath string, count int) (ent values.Set(storageRESTVolume, volume) values.Set(storageRESTDirPath, dirPath) values.Set(storageRESTCount, strconv.Itoa(count)) - respBody, err := client.call(storageRESTMethodListDir, values, nil) + respBody, err := client.call(storageRESTMethodListDir, values, nil, -1) if err != nil { return nil, err } @@ -302,7 +315,7 @@ func (client *storageRESTClient) DeleteFile(volume, path string) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - respBody, err := client.call(storageRESTMethodDeleteFile, values, nil) + respBody, err := client.call(storageRESTMethodDeleteFile, values, nil, -1) defer CloseResponse(respBody) return err } @@ -314,7 +327,7 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa values.Set(storageRESTSrcPath, srcPath) values.Set(storageRESTDstVolume, dstVolume) values.Set(storageRESTDstPath, dstPath) - respBody, err := client.call(storageRESTMethodRenameFile, values, nil) + respBody, err := client.call(storageRESTMethodRenameFile, values, nil, -1) defer CloseResponse(respBody) return err } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index ec7dbe5f3..acdd9db59 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -16,7 +16,7 @@ package cmd -const storageRESTVersion = "v2" +const storageRESTVersion = "v3" const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/" const ( @@ -26,15 +26,16 @@ const ( storageRESTMethodDeleteVol = "deletevol" storageRESTMethodListVols = "listvols" - storageRESTMethodPrepareFile = "preparefile" - storageRESTMethodAppendFile = "appendfile" - storageRESTMethodWriteAll = "writeall" - storageRESTMethodStatFile = "statfile" - storageRESTMethodReadAll = "readall" - storageRESTMethodReadFile = "readfile" - storageRESTMethodListDir = "listdir" - storageRESTMethodDeleteFile = "deletefile" - storageRESTMethodRenameFile = "renamefile" + storageRESTMethodAppendFile = "appendfile" + storageRESTMethodCreateFile = "createfile" + storageRESTMethodWriteAll = "writeall" + storageRESTMethodStatFile = "statfile" + storageRESTMethodReadAll = "readall" + storageRESTMethodReadFile = "readfile" + storageRESTMethodReadFileStream = "readfilestream" + storageRESTMethodListDir = "listdir" + storageRESTMethodDeleteFile = "deletefile" + storageRESTMethodRenameFile = "renamefile" ) const ( diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index d85279068..fb6acc17c 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -133,28 +133,29 @@ func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Requ } } -// PrepareFileHandler - fallocate() space for a file. -func (s *storageRESTServer) PrepareFileHandler(w http.ResponseWriter, r *http.Request) { +// AppendFileHandler - append data from the request to the file specified. +func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { return } vars := mux.Vars(r) volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] - fileSizeStr := vars[storageRESTLength] - fileSize, err := strconv.Atoi(fileSizeStr) + + buf := make([]byte, r.ContentLength) + _, err := io.ReadFull(r.Body, buf) if err != nil { s.writeErrorResponse(w, err) return } - err = s.storage.PrepareFile(volume, filePath, int64(fileSize)) + err = s.storage.AppendFile(volume, filePath, buf) if err != nil { s.writeErrorResponse(w, err) } } -// AppendFileHandler - append to a file. -func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) { +// CreateFileHandler - fallocate() space for a file and copy the contents from the request. +func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { return } @@ -162,18 +163,13 @@ func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Req volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] - if r.ContentLength < 0 { - s.writeErrorResponse(w, errInvalidArgument) - return - } - - buf := make([]byte, r.ContentLength) - _, err := io.ReadFull(r.Body, buf) + fileSizeStr := vars[storageRESTLength] + fileSize, err := strconv.Atoi(fileSizeStr) if err != nil { s.writeErrorResponse(w, err) return } - err = s.storage.AppendFile(volume, filePath, buf) + err = s.storage.CreateFile(volume, filePath, int64(fileSize), r.Body) if err != nil { s.writeErrorResponse(w, err) } @@ -285,6 +281,34 @@ func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Reque w.Write(buf) } +// ReadFileHandler - read section of a file. +func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := mux.Vars(r) + volume := vars[storageRESTVolume] + filePath := vars[storageRESTFilePath] + offset, err := strconv.Atoi(vars[storageRESTOffset]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + length, err := strconv.Atoi(vars[storageRESTLength]) + if err != nil { + s.writeErrorResponse(w, err) + return + } + rc, err := s.storage.ReadFileStream(volume, filePath, int64(offset), int64(length)) + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer rc.Close() + w.Header().Set("Content-Length", strconv.Itoa(length)) + io.Copy(w, rc) +} + // ListDirHandler - list a directory. func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -359,18 +383,21 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) { subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler)).Queries(restQueries(storageRESTVolume)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListVols).HandlerFunc(httpTraceHdrs(server.ListVolsHandler)) - subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodPrepareFile).HandlerFunc(httpTraceHdrs(server.PrepareFileHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTLength)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodAppendFile).HandlerFunc(httpTraceHdrs(server.AppendFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodWriteAll).HandlerFunc(httpTraceHdrs(server.WriteAllHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodCreateFile).HandlerFunc(httpTraceHdrs(server.CreateFileHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTLength)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodStatFile).HandlerFunc(httpTraceHdrs(server.StatFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadAll).HandlerFunc(httpTraceHdrs(server.ReadAllHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadFile).HandlerFunc(httpTraceHdrs(server.ReadFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength, storageRESTBitrotAlgo, storageRESTBitrotHash)...) + subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadFileStream).HandlerFunc(httpTraceHdrs(server.ReadFileStreamHandler)). + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount)...) subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index ecc68a658..5acf44352 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -365,38 +365,6 @@ func testStorageAPIReadFile(t *testing.T, storage StorageAPI) { } } -func testStorageAPIPrepareFile(t *testing.T, storage StorageAPI) { - tmpGlobalServerConfig := globalServerConfig - defer func() { - globalServerConfig = tmpGlobalServerConfig - }() - globalServerConfig = newServerConfig() - - err := storage.MakeVol("foo") - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - testCases := []struct { - volumeName string - objectName string - expectErr bool - }{ - {"foo", "myobject", false}, - // volume not found error. - {"bar", "myobject", true}, - } - - for i, testCase := range testCases { - err := storage.PrepareFile(testCase.volumeName, testCase.objectName, 1) - expectErr := (err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("case %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - func testStorageAPIAppendFile(t *testing.T, storage StorageAPI) { tmpGlobalServerConfig := globalServerConfig defer func() { @@ -648,17 +616,6 @@ func TestStorageRESTClientReadFile(t *testing.T) { testStorageAPIReadFile(t, restClient) } -func TestStorageRESTClientPrepareFile(t *testing.T) { - httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) - defer httpServer.Close() - defer func() { - globalServerConfig = prevGlobalServerConfig - }() - defer os.RemoveAll(endpointPath) - - testStorageAPIPrepareFile(t, restClient) -} - func TestStorageRESTClientAppendFile(t *testing.T) { httpServer, restClient, prevGlobalServerConfig, endpointPath := newStorageRESTHTTPServerClient(t) defer httpServer.Close() diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 3388a909e..bc0db1362 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -77,7 +77,8 @@ func init() { // Set system resources to maximum. setMaxResources() - logger.Disable = true + // Uncomment the following line to see trace logs during unit tests. + // logger.AddTarget(console.New()) } // concurreny level for certain parallel tests. diff --git a/cmd/xl-v1-healing-common.go b/cmd/xl-v1-healing-common.go index 6d42904ab..5e4d04a71 100644 --- a/cmd/xl-v1-healing-common.go +++ b/cmd/xl-v1-healing-common.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "path/filepath" "strings" "time" @@ -161,7 +160,6 @@ func getLatestXLMeta(ctx context.Context, partsMetadata []xlMetaV1, errs []error func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs []error, bucket, object string) ([]StorageAPI, []error) { availableDisks := make([]StorageAPI, len(onlineDisks)) - buffer := []byte{} dataErrs := make([]error, len(onlineDisks)) for i, onlineDisk := range onlineDisks { @@ -170,31 +168,26 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad continue } + erasureInfo := partsMetadata[i].Erasure + erasure, err := NewErasure(ctx, erasureInfo.DataBlocks, erasureInfo.ParityBlocks, erasureInfo.BlockSize) + if err != nil { + dataErrs[i] = err + continue + } + // disk has a valid xl.json but may not have all the // parts. This is considered an outdated disk, since // it needs healing too. for _, part := range partsMetadata[i].Parts { - partPath := filepath.Join(object, part.Name) - checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(part.Name) - verifier := NewBitrotVerifier(checksumInfo.Algorithm, checksumInfo.Hash) - - // verification happens even if a 0-length - // buffer is passed - _, hErr := onlineDisk.ReadFile(bucket, partPath, 0, buffer, verifier) - - isCorrupt := false - if hErr != nil { - isCorrupt = strings.HasPrefix(hErr.Error(), "Bitrot verification mismatch - expected ") - } - switch { - case isCorrupt: - fallthrough - case hErr == errFileNotFound, hErr == errVolumeNotFound: - dataErrs[i] = hErr - break - case hErr != nil: - logger.LogIf(ctx, hErr) - dataErrs[i] = hErr + checksumInfo := erasureInfo.GetChecksumInfo(part.Name) + tillOffset := erasure.ShardFileTillOffset(0, part.Size, part.Size) + err = bitrotCheckFile(onlineDisk, bucket, pathJoin(object, part.Name), tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) + if err != nil { + isCorrupt := strings.HasPrefix(err.Error(), "Bitrot verification mismatch - expected ") + if !isCorrupt && err != errFileNotFound && err != errVolumeNotFound { + logger.LogIf(ctx, err) + } + dataErrs[i] = err break } } diff --git a/cmd/xl-v1-healing-common_test.go b/cmd/xl-v1-healing-common_test.go index 184b06d84..160c3d43e 100644 --- a/cmd/xl-v1-healing-common_test.go +++ b/cmd/xl-v1-healing-common_test.go @@ -19,6 +19,7 @@ package cmd import ( "bytes" "context" + "os" "path/filepath" "testing" "time" @@ -85,28 +86,6 @@ func TestCommonTime(t *testing.T) { } } -// partsMetaFromModTimes - returns slice of modTimes given metadata of -// an object part. -func partsMetaFromModTimes(modTimes []time.Time, algorithm BitrotAlgorithm, checksums []ChecksumInfo) []xlMetaV1 { - var partsMetadata []xlMetaV1 - for _, modTime := range modTimes { - partsMetadata = append(partsMetadata, xlMetaV1{ - Erasure: ErasureInfo{ - Checksums: checksums, - }, - Stat: statInfo{ - ModTime: modTime, - }, - Parts: []ObjectPartInfo{ - { - Name: "part.1", - }, - }, - }) - } - return partsMetadata -} - // TestListOnlineDisks - checks if listOnlineDisks and outDatedDisks // are consistent with each other. func TestListOnlineDisks(t *testing.T) { @@ -205,12 +184,6 @@ func TestListOnlineDisks(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - // Fetch xl.json from first disk to construct partsMetadata for the tests. - xlMeta, err := readXLMeta(context.Background(), xlDisks[0], bucket, object) - if err != nil { - t.Fatalf("Test %d: Failed to read xl.json %v", i+1, err) - } - tamperedIndex := -1 switch test._tamperBackend { case deletePart: @@ -240,17 +213,25 @@ func TestListOnlineDisks(t *testing.T) { // and check if that disk // appears in outDatedDisks. tamperedIndex = index - dErr := xlDisks[index].AppendFile(bucket, filepath.Join(object, "part.1"), []byte("corruption")) - if dErr != nil { - t.Fatalf("Test %d: Failed to append corrupting data at the end of file %s - %v", - i+1, filepath.Join(object, "part.1"), dErr) + filePath := pathJoin(xlDisks[index].String(), bucket, object, "part.1") + f, err := os.OpenFile(filePath, os.O_WRONLY, 0) + if err != nil { + t.Fatalf("Failed to open %s: %s\n", filePath, err) } + f.Write([]byte("oops")) // Will cause bitrot error + f.Close() break } } - partsMetadata := partsMetaFromModTimes(test.modTimes, DefaultBitrotAlgorithm, xlMeta.Erasure.Checksums) + partsMetadata, errs := readAllXLMetadata(context.Background(), xlDisks, bucket, object) + for i := range partsMetadata { + if errs[i] != nil { + t.Fatalf("Test %d: expected error to be nil: %s", i+1, errs[i].Error()) + } + partsMetadata[i].Stat.ModTime = test.modTimes[i] + } onlineDisks, modTime := listOnlineDisks(xlDisks, partsMetadata, test.errs) if !modTime.Equal(test.expectedTime) { @@ -303,6 +284,29 @@ func TestDisksWithAllParts(t *testing.T) { t.Fatalf("Failed to read xl meta data %v", reducedErr) } + // Test that all disks are returned without any failures with + // unmodified meta data + partsMetadata, errs = readAllXLMetadata(ctx, xlDisks, bucket, object) + if err != nil { + t.Fatalf("Failed to read xl meta data %v", err) + } + + filteredDisks, errs := disksWithAllParts(ctx, xlDisks, partsMetadata, errs, bucket, object) + + if len(filteredDisks) != len(xlDisks) { + t.Errorf("Unexpected number of disks: %d", len(filteredDisks)) + } + + for diskIndex, disk := range filteredDisks { + if errs[diskIndex] != nil { + t.Errorf("Unexpected error %s", errs[diskIndex]) + } + + if disk == nil { + t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex) + } + } + diskFailures := make(map[int]string) // key = disk index, value = part name with hash mismatch diskFailures[0] = "part.3" @@ -310,15 +314,21 @@ func TestDisksWithAllParts(t *testing.T) { diskFailures[15] = "part.2" for diskIndex, partName := range diskFailures { - for index, info := range partsMetadata[diskIndex].Erasure.Checksums { + for _, info := range partsMetadata[diskIndex].Erasure.Checksums { if info.Name == partName { - partsMetadata[diskIndex].Erasure.Checksums[index].Hash[0]++ + filePath := pathJoin(xlDisks[diskIndex].String(), bucket, object, partName) + f, err := os.OpenFile(filePath, os.O_WRONLY, 0) + if err != nil { + t.Fatalf("Failed to open %s: %s\n", filePath, err) + } + f.Write([]byte("oops")) // Will cause bitrot error + f.Close() } } } errs = make([]error, len(xlDisks)) - filteredDisks, errs := disksWithAllParts(ctx, xlDisks, partsMetadata, errs, bucket, object) + filteredDisks, errs = disksWithAllParts(ctx, xlDisks, partsMetadata, errs, bucket, object) if len(filteredDisks) != len(xlDisks) { t.Errorf("Unexpected number of disks: %d", len(filteredDisks)) @@ -343,27 +353,4 @@ func TestDisksWithAllParts(t *testing.T) { } } - // Test that all disks are returned without any failures with - // unmodified meta data - partsMetadata, errs = readAllXLMetadata(ctx, xlDisks, bucket, object) - if err != nil { - t.Fatalf("Failed to read xl meta data %v", err) - } - - filteredDisks, errs = disksWithAllParts(ctx, xlDisks, partsMetadata, errs, bucket, object) - - if len(filteredDisks) != len(xlDisks) { - t.Errorf("Unexpected number of disks: %d", len(filteredDisks)) - } - - for diskIndex, disk := range filteredDisks { - if errs[diskIndex] != nil { - t.Errorf("Unexpected error %s", errs[diskIndex]) - } - - if disk == nil { - t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex) - } - } - } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 3c077ecf0..464c91f4b 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "io" "path" "sync" @@ -405,15 +406,16 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o latestDisks = shuffleDisks(latestDisks, latestMeta.Erasure.Distribution) outDatedDisks = shuffleDisks(outDatedDisks, latestMeta.Erasure.Distribution) partsMetadata = shufflePartsMetadata(partsMetadata, latestMeta.Erasure.Distribution) + for i := range outDatedDisks { + if outDatedDisks[i] == nil { + continue + } + partsMetadata[i] = newXLMetaFromXLMeta(latestMeta) + } // We write at temporary location and then rename to final location. tmpID := mustGetUUID() - // Checksum of the part files. checkSumInfos[index] will - // contain checksums of all the part files in the - // outDatedDisks[index] - checksumInfos := make([][]ChecksumInfo, len(outDatedDisks)) - // Heal each part. erasureHealFile() will write the healed // part to .minio/tmp/uuid/ which needs to be renamed later to // the final location. @@ -423,29 +425,31 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o return result, toObjectErr(err, bucket, object) } + erasureInfo := latestMeta.Erasure for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { partName := latestMeta.Parts[partIndex].Name partSize := latestMeta.Parts[partIndex].Size - erasureInfo := latestMeta.Erasure - var algorithm BitrotAlgorithm - bitrotReaders := make([]*bitrotReader, len(latestDisks)) + partActualSize := latestMeta.Parts[partIndex].ActualSize + partNumber := latestMeta.Parts[partIndex].Number + tillOffset := erasure.ShardFileTillOffset(0, partSize, partSize) + checksumInfo := erasureInfo.GetChecksumInfo(partName) + readers := make([]io.ReaderAt, len(latestDisks)) for i, disk := range latestDisks { if disk == OfflineDisk { continue } - info := partsMetadata[i].Erasure.GetChecksumInfo(partName) - algorithm = info.Algorithm - endOffset := getErasureShardFileEndOffset(0, partSize, partSize, erasureInfo.BlockSize, erasure.dataBlocks) - bitrotReaders[i] = newBitrotReader(disk, bucket, pathJoin(object, partName), algorithm, endOffset, info.Hash) + readers[i] = newBitrotReader(disk, bucket, pathJoin(object, partName), tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) } - bitrotWriters := make([]*bitrotWriter, len(outDatedDisks)) + writers := make([]io.Writer, len(outDatedDisks)) for i, disk := range outDatedDisks { if disk == OfflineDisk { continue } - bitrotWriters[i] = newBitrotWriter(disk, minioMetaTmpBucket, pathJoin(tmpID, partName), algorithm) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, pathJoin(tmpID, partName), tillOffset, checksumInfo.Algorithm, erasure.ShardSize()) } - hErr := erasure.Heal(ctx, bitrotReaders, bitrotWriters, partSize) + hErr := erasure.Heal(ctx, readers, writers, partSize) + closeBitrotReaders(readers) + closeBitrotWriters(writers) if hErr != nil { return result, toObjectErr(hErr, bucket, object) } @@ -457,14 +461,13 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o } // A non-nil stale disk which did not receive // a healed part checksum had a write error. - if bitrotWriters[i] == nil { + if writers[i] == nil { outDatedDisks[i] = nil disksToHealCount-- continue } - // append part checksums - checksumInfos[i] = append(checksumInfos[i], - ChecksumInfo{partName, algorithm, bitrotWriters[i].Sum()}) + partsMetadata[i].AddObjectPart(partNumber, partName, "", partSize, partActualSize) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, checksumInfo.Algorithm, bitrotWriterSum(writers[i])}) } // If all disks are having errors, we give up. @@ -479,7 +482,6 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o continue } partsMetadata[index] = latestMeta - partsMetadata[index].Erasure.Checksums = checksumInfos[index] } // Generate and write `xl.json` generated from other disks. diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 0987223fb..6d3a347fd 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -58,7 +58,7 @@ type ChecksumInfo struct { type checksumInfoJSON struct { Name string `json:"name"` Algorithm string `json:"algorithm"` - Hash string `json:"hash"` + Hash string `json:"hash,omitempty"` } // MarshalJSON marshals the ChecksumInfo struct @@ -186,6 +186,23 @@ func newXLMetaV1(object string, dataBlocks, parityBlocks int) (xlMeta xlMetaV1) return xlMeta } +// Return a new xlMetaV1 initialized using the given xlMetaV1. Used in healing to make sure that we do not copy +// over any part's checksum info which will differ for different disks. +func newXLMetaFromXLMeta(meta xlMetaV1) xlMetaV1 { + xlMeta := xlMetaV1{} + xlMeta.Version = xlMetaVersion + xlMeta.Format = xlMetaFormat + xlMeta.Minio.Release = ReleaseTag + xlMeta.Erasure = ErasureInfo{ + Algorithm: meta.Erasure.Algorithm, + DataBlocks: meta.Erasure.DataBlocks, + ParityBlocks: meta.Erasure.DataBlocks, + BlockSize: meta.Erasure.BlockSize, + Distribution: meta.Erasure.Distribution, + } + return xlMeta +} + // IsValid - tells if the format is sane by validating the version // string, format and erasure info fields. func (m xlMetaV1) IsValid() bool { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 0965c9b8c..bb0e200a8 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "io" "path" "sort" "strconv" @@ -351,14 +352,6 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum, false) - if data.Size() >= 0 { - if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), - onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); pErr != nil { - return pi, toObjectErr(pErr, bucket, object) - - } - } - erasure, err := NewErasure(ctx, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) @@ -380,16 +373,16 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID if len(buffer) > int(xlMeta.Erasure.BlockSize) { buffer = buffer[:xlMeta.Erasure.BlockSize] } - - writers := make([]*bitrotWriter, len(onlineDisks)) + writers := make([]io.Writer, len(onlineDisks)) for i, disk := range onlineDisks { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, DefaultBitrotAlgorithm) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } n, err := erasure.Encode(ctx, data, writers, buffer, erasure.dataBlocks+1) + closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) } @@ -455,7 +448,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID } partsMetadata[i].Stat = xlMeta.Stat partsMetadata[i].Parts = xlMeta.Parts - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partSuffix, DefaultBitrotAlgorithm, writers[i].Sum()}) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partSuffix, DefaultBitrotAlgorithm, bitrotWriterSum(writers[i])}) } // Write all the checksum metadata. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 4132a3049..6a4d63bf4 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -57,25 +57,6 @@ func (xl xlObjects) putObjectDir(ctx context.Context, bucket, object string, wri return reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) } -// prepareFile hints the bottom layer to optimize the creation of a new object -func (xl xlObjects) prepareFile(ctx context.Context, bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error { - pErrs := make([]error, len(onlineDisks)) - // Calculate the real size of the part in one disk. - actualSize := getErasureShardFileSize(blockSize, size, dataBlocks) - // Prepare object creation in a all disks - for index, disk := range onlineDisks { - if disk != nil { - if err := disk.PrepareFile(bucket, object, actualSize); err != nil { - // Save error to reduce it later - pErrs[index] = err - // Ignore later access to disk which generated the error - onlineDisks[index] = nil - } - } - } - return reduceWriteQuorumErrs(ctx, pErrs, objectOpIgnoredErrs, writeQuorum) -} - /// Object Operations // CopyObject - copy object source object to destination object. @@ -348,22 +329,24 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO partLength = length - totalBytesRead } + tillOffset := erasure.ShardFileTillOffset(partOffset, partLength, partSize) // Get the checksums of the current part. - bitrotReaders := make([]*bitrotReader, len(onlineDisks)) + readers := make([]io.ReaderAt, len(onlineDisks)) for index, disk := range onlineDisks { if disk == OfflineDisk { continue } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partName) - endOffset := getErasureShardFileEndOffset(partOffset, partLength, partSize, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks) - bitrotReaders[index] = newBitrotReader(disk, bucket, pathJoin(object, partName), checksumInfo.Algorithm, endOffset, checksumInfo.Hash) + readers[index] = newBitrotReader(disk, bucket, pathJoin(object, partName), tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) } - - err := erasure.Decode(ctx, writer, bitrotReaders, partOffset, partLength, partSize) + err := erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize) + // Note: we should not be defer'ing the following closeBitrotReaders() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time + // we return from this function. + closeBitrotReaders(readers) if err != nil { return toObjectErr(err, bucket, object) } - for i, r := range bitrotReaders { + for i, r := range readers { if r == nil { onlineDisks[i] = OfflineDisk } @@ -681,7 +664,6 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, uniqueID := mustGetUUID() tempObj := uniqueID - // No metadata is set, allocate a new one. if metadata == nil { metadata = make(map[string]string) @@ -793,6 +775,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, var curPartSize int64 curPartSize, err = calculatePartSizeFromIdx(ctx, data.Size(), globalPutPartSize, partIdx) if err != nil { + logger.LogIf(ctx, err) return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -800,27 +783,24 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // This is only an optimization. var curPartReader io.Reader - if curPartSize >= 0 { - pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tempErasureObj, curPartSize, onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum) - if pErr != nil { - return ObjectInfo{}, toObjectErr(pErr, bucket, object) - } - } - if curPartSize < data.Size() { curPartReader = io.LimitReader(reader, curPartSize) } else { curPartReader = reader } - writers := make([]*bitrotWriter, len(onlineDisks)) + writers := make([]io.Writer, len(onlineDisks)) for i, disk := range onlineDisks { if disk == nil { continue } - writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, DefaultBitrotAlgorithm) + writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, erasure.ShardFileSize(curPartSize), DefaultBitrotAlgorithm, erasure.ShardSize()) } + n, erasureErr := erasure.Encode(ctx, curPartReader, writers, buffer, erasure.dataBlocks+1) + // Note: we should not be defer'ing the following closeBitrotWriters() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time + // we return from this function. + closeBitrotWriters(writers) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) } @@ -853,7 +833,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, continue } partsMetadata[i].AddObjectPart(partIdx, partName, "", n, data.ActualSize()) - partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, w.Sum()}) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, bitrotWriterSum(w)}) } // We wrote everything, break out.