diff --git a/cmd/bitrot-streaming.go b/cmd/bitrot-streaming.go index 7104c00ad..e653d2848 100644 --- a/cmd/bitrot-streaming.go +++ b/cmd/bitrot-streaming.go @@ -32,19 +32,9 @@ type streamingBitrotWriter struct { h hash.Hash shardSize int64 canClose chan struct{} // Needed to avoid race explained in Close() call. - - // Following two fields are used only to make sure that Write(p) is called such that - // len(p) is always the block size except the last block, i.e prevent programmer errors. - currentBlockIdx int - verifyTillIdx int } func (b *streamingBitrotWriter) Write(p []byte) (int, error) { - if b.currentBlockIdx < b.verifyTillIdx && int64(len(p)) != b.shardSize { - // All blocks except last should be of the length b.shardSize - logger.LogIf(context.Background(), errUnexpected) - return 0, errUnexpected - } if len(p) == 0 { return 0, nil } @@ -57,7 +47,6 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) { return 0, err } n, err = b.iow.Write(p) - b.currentBlockIdx++ return n, err } @@ -78,10 +67,13 @@ func (b *streamingBitrotWriter) Close() error { func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { r, w := io.Pipe() h := algo.New() - bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{}), 0, int(length / shardSize)} + bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{})} go func() { - bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. - totalFileSize := bitrotSumsTotalSize + length + totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1) + if length != -1 { + bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. + totalFileSize = bitrotSumsTotalSize + length + } err := disk.CreateFile(volume, filePath, totalFileSize, r) if err != nil { reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", disk.String()) diff --git a/cmd/bitrot-whole.go b/cmd/bitrot-whole.go index 2bb43ec51..2416e687c 100644 --- a/cmd/bitrot-whole.go +++ b/cmd/bitrot-whole.go @@ -31,19 +31,9 @@ type wholeBitrotWriter struct { filePath string shardSize int64 // This is the shard size of the erasure logic hash.Hash // For bitrot hash - - // Following two fields are used only to make sure that Write(p) is called such that - // len(p) is always the block size except the last block and prevent programmer errors. - currentBlockIdx int - lastBlockIdx int } func (b *wholeBitrotWriter) Write(p []byte) (int, error) { - if b.currentBlockIdx < b.lastBlockIdx && int64(len(p)) != b.shardSize { - // All blocks except last should be of the length b.shardSize - logger.LogIf(context.Background(), errUnexpected) - return 0, errUnexpected - } err := b.disk.AppendFile(b.volume, b.filePath, p) if err != nil { logger.LogIf(context.Background(), err) @@ -54,7 +44,6 @@ func (b *wholeBitrotWriter) Write(p []byte) (int, error) { logger.LogIf(context.Background(), err) return 0, err } - b.currentBlockIdx++ return len(p), nil } @@ -63,8 +52,8 @@ func (b *wholeBitrotWriter) Close() error { } // Returns whole-file bitrot writer. -func newWholeBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { - return &wholeBitrotWriter{disk, volume, filePath, shardSize, algo.New(), 0, int(length / shardSize)} +func newWholeBitrotWriter(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { + return &wholeBitrotWriter{disk, volume, filePath, shardSize, algo.New()} } // Implementation to verify bitrot for the whole file. diff --git a/cmd/bitrot.go b/cmd/bitrot.go index 383e5dfbc..14a7bde4a 100644 --- a/cmd/bitrot.go +++ b/cmd/bitrot.go @@ -120,7 +120,7 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg if algo == HighwayHash256S { return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize) } - return newWholeBitrotWriter(disk, volume, filePath, length, algo, shardSize) + return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize) } func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { diff --git a/cmd/erasure.go b/cmd/erasure.go index 7dfa1242b..5d60af06d 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -103,6 +103,9 @@ func (e *Erasure) ShardFileSize(totalLength int64) int64 { if totalLength == 0 { return 0 } + if totalLength == -1 { + return -1 + } numShards := totalLength / e.blockSize lastBlockSize := totalLength % int64(e.blockSize) lastShardSize := ceilFrac(lastBlockSize, int64(e.dataBlocks)) diff --git a/cmd/posix.go b/cmd/posix.go index 2f1ef0b9f..7936d553a 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -187,7 +187,7 @@ func newPosix(path string) (*posix, error) { p := &posix{ connected: true, diskPath: path, - // 1MiB buffer pool for posix internal operations. + // 4MiB buffer pool for posix internal operations. pool: sync.Pool{ New: func() interface{} { b := directio.AlignedBlock(posixWriteBlockSize) @@ -1028,10 +1028,9 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re // CreateFile - creates the file. func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) { - if fileSize < 0 { + if fileSize < -1 { return errInvalidArgument } - defer func() { if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) @@ -1120,69 +1119,69 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er defer s.pool.Put(bufp) buf := *bufp - var written int64 - dioCount := int(fileSize) / len(buf) - for i := 0; i < dioCount; i++ { + + // Writes remaining bytes in the buffer. + writeRemaining := func(w *os.File, buf []byte) (remainingWritten int, err error) { var n int - _, err = io.ReadFull(r, buf) - if err != nil { - return err - } - n, err = w.Write(buf) - if err != nil { - return err - } - written += int64(n) - } - // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer) - // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode. - remaining := fileSize % int64(len(buf)) - if remaining != 0 { - buf = buf[:remaining] - _, err = io.ReadFull(r, buf) - if err != nil { - return err - } + remaining := len(buf) + // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer) + // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode. remainingAligned := (remaining / directioAlignSize) * directioAlignSize remainingAlignedBuf := buf[:remainingAligned] remainingUnalignedBuf := buf[remainingAligned:] if len(remainingAlignedBuf) > 0 { - var n int n, err = w.Write(remainingAlignedBuf) if err != nil { - return err + return 0, err } - written += int64(n) + remainingWritten += n } if len(remainingUnalignedBuf) > 0 { - var n int // Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT. if err = disk.DisableDirectIO(w); err != nil { - return err + return 0, err } n, err = w.Write(remainingUnalignedBuf) if err != nil { - return err + return 0, err } - written += int64(n) + remainingWritten += n } + return remainingWritten, nil } - // Do some sanity checks. - _, err = io.ReadFull(r, buf) - if err != io.EOF { - return errMoreData - } - - if written < fileSize { - return errLessData - } - - if written > fileSize { - return errMoreData + var written int + for { + var n int + n, err = io.ReadFull(r, buf) + switch err { + case nil: + n, err = w.Write(buf) + if err != nil { + return err + } + written += n + case io.ErrUnexpectedEOF: + n, err = writeRemaining(w, buf[:n]) + if err != nil { + return err + } + written += n + fallthrough + case io.EOF: + if fileSize != -1 { + if written < int(fileSize) { + return errLessData + } + if written > int(fileSize) { + return errMoreData + } + } + return nil + default: + return err + } } - - return nil } func (s *posix) WriteAll(volume, path string, buf []byte) (err error) { diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index 74a2da693..11b3d81fd 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -350,6 +350,9 @@ func calculatePartSizeFromIdx(ctx context.Context, totalSize int64, partSize int logger.LogIf(ctx, errPartSizeIndex) return 0, errPartSizeIndex } + if totalSize == -1 { + return -1, nil + } if totalSize > 0 { // Compute the total count of parts partsCount := totalSize/partSize + 1