diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 4a48c9791..2a6f1f1cc 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -182,11 +182,11 @@ func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, er return errs, nil } -func (d *naughtyDisk) WriteAll(volume string, path string, buf []byte) (err error) { +func (d *naughtyDisk) WriteAll(volume string, path string, reader io.Reader) (err error) { if err := d.calcError(); err != nil { return err } - return d.disk.WriteAll(volume, path, buf) + return d.disk.WriteAll(volume, path, reader) } func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error) { diff --git a/cmd/posix.go b/cmd/posix.go index 6fe37f8f4..78ae3afce 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -35,20 +35,19 @@ import ( "bytes" humanize "github.com/dustin/go-humanize" + "github.com/klauspost/readahead" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/disk" + xioutil "github.com/minio/minio/pkg/ioutil" "github.com/minio/minio/pkg/mountinfo" "github.com/ncw/directio" ) const ( - diskMinFreeSpace = 900 * humanize.MiByte // Min 900MiB free space. - diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space. - maxAllowedIOError = 5 - posixWriteBlockSize = 4 * humanize.MiByte - // DirectIO alignment needs to be 4K. Defined here as - // directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. - directioAlignSize = 4096 + diskMinFreeSpace = 900 * humanize.MiByte // Min 900MiB free space. + diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space. + maxAllowedIOError = 5 + readBlockSize = humanize.KiByte * 32 // Default read block size 32KiB. ) // isValidVolname verifies a volname name in accordance with object @@ -190,10 +189,9 @@ func newPosix(path string) (*posix, error) { p := &posix{ connected: true, diskPath: path, - // 4MiB buffer pool for posix internal operations. pool: sync.Pool{ New: func() interface{} { - b := directio.AlignedBlock(posixWriteBlockSize) + b := directio.AlignedBlock(readBlockSize) return &b }, }, @@ -1110,10 +1108,13 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re if _, err = file.Seek(offset, io.SeekStart); err != nil { return nil, err } - return struct { + + r := struct { io.Reader io.Closer - }{Reader: io.LimitReader(file, length), Closer: file}, nil + }{Reader: io.LimitReader(file, length), Closer: file} + + return readahead.NewReadCloser(r), nil } // CreateFile - creates the file. @@ -1182,10 +1183,6 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er return err } } - defer func() { - w.Sync() // Sync before close. - w.Close() - }() var e error if fileSize > 0 { @@ -1208,76 +1205,26 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er return err } + defer w.Close() + bufp := s.pool.Get().(*[]byte) defer s.pool.Put(bufp) - buf := *bufp - - // Writes remaining bytes in the buffer. - writeRemaining := func(w *os.File, buf []byte) (remainingWritten int, err error) { - var n int - remaining := len(buf) - // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer) - // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode. - remainingAligned := (remaining / directioAlignSize) * directioAlignSize - remainingAlignedBuf := buf[:remainingAligned] - remainingUnalignedBuf := buf[remainingAligned:] - if len(remainingAlignedBuf) > 0 { - n, err = w.Write(remainingAlignedBuf) - if err != nil { - return 0, err - } - remainingWritten += n - } - if len(remainingUnalignedBuf) > 0 { - // Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT. - if err = disk.DisableDirectIO(w); err != nil { - return 0, err - } - n, err = w.Write(remainingUnalignedBuf) - if err != nil { - return 0, err - } - remainingWritten += n - } - return remainingWritten, nil + written, err := xioutil.CopyAligned(w, r, *bufp) + if err != nil { + return err } - var written int - for { - var n int - n, err = io.ReadFull(r, buf) - switch err { - case nil: - n, err = w.Write(buf) - if err != nil { - return err - } - written += n - case io.ErrUnexpectedEOF: - n, err = writeRemaining(w, buf[:n]) - if err != nil { - return err - } - written += n - fallthrough - case io.EOF: - if fileSize != -1 { - if written < int(fileSize) { - return errLessData - } - if written > int(fileSize) { - return errMoreData - } - } - return nil - default: - return err - } + if written < fileSize { + return errLessData + } else if written > fileSize { + return errMoreData } + + return nil } -func (s *posix) WriteAll(volume, path string, buf []byte) (err error) { +func (s *posix) WriteAll(volume, path string, reader io.Reader) (err error) { defer func() { if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) @@ -1295,11 +1242,13 @@ func (s *posix) WriteAll(volume, path string, buf []byte) (err error) { return err } - if _, err = w.Write(buf); err != nil { - return err - } + defer w.Close() - return w.Close() + bufp := s.pool.Get().(*[]byte) + defer s.pool.Put(bufp) + + _, err = io.CopyBuffer(w, reader, *bufp) + return err } // AppendFile - append a byte array at path, if file doesn't exist at diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 0502c6952..74197f3de 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -54,7 +54,7 @@ type StorageAPI interface { DeleteFileBulk(volume string, paths []string) (errs []error, err error) // Write all data, syncs the data to disk. - WriteAll(volume string, path string, buf []byte) (err error) + WriteAll(volume string, path string, reader io.Reader) (err error) // Read all. ReadAll(volume string, path string) (buf []byte, err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ede29de94..dbb98f500 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -256,11 +256,10 @@ func (client *storageRESTClient) CreateFile(volume, path string, length int64, r } // WriteAll - write all data to a file. -func (client *storageRESTClient) WriteAll(volume, path string, buffer []byte) error { +func (client *storageRESTClient) WriteAll(volume, path string, reader io.Reader) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - reader := bytes.NewBuffer(buffer) respBody, err := client.call(storageRESTMethodWriteAll, values, reader, -1) defer http.DrainBody(respBody) return err diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 797c34875..1ecc79987 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -245,14 +245,7 @@ func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Reque return } - buf := make([]byte, r.ContentLength) - _, err := io.ReadFull(r.Body, buf) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - err = s.storage.WriteAll(volume, filePath, buf) + err := s.storage.WriteAll(volume, filePath, io.LimitReader(r.Body, r.ContentLength)) if err != nil { s.writeErrorResponse(w, err) } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 734e20718..810c85056 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "crypto/sha256" "encoding/hex" @@ -437,7 +438,7 @@ func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string } // Persist marshaled data. - err = disk.WriteAll(bucket, jsonFile, metadataBytes) + err = disk.WriteAll(bucket, jsonFile, bytes.NewReader(metadataBytes)) logger.LogIf(ctx, err) return err } diff --git a/go.mod b/go.mod index 2e569502b..8a49b86af 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/klauspost/compress v1.4.1 // indirect github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/pgzip v1.2.1 + github.com/klauspost/readahead v1.3.0 github.com/klauspost/reedsolomon v1.9.1 github.com/lib/pq v1.0.0 github.com/mattn/go-isatty v0.0.7 diff --git a/go.sum b/go.sum index f9d0d5515..cf10cdb4c 100644 --- a/go.sum +++ b/go.sum @@ -337,6 +337,8 @@ github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPR github.com/klauspost/pgzip v0.0.0-20180606150939-90b2c57fba35/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.1 h1:oIPZROsWuPHpOdMVWLuJZXwgjhrW8r1yEX8UqMyeNHM= github.com/klauspost/pgzip v1.2.1/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/klauspost/readahead v1.3.0 h1:ur57scQa1RS6oQgdq+6mylmP2u0iR1LFw1zy3Xwqacg= +github.com/klauspost/readahead v1.3.0/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= github.com/klauspost/reedsolomon v0.0.0-20190210214925-2b210cf0866d/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/klauspost/reedsolomon v1.9.1 h1:kYrT1MlR4JH6PqOpC+okdb9CDTcwEC/BqpzK4WFyXL8= github.com/klauspost/reedsolomon v1.9.1/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= diff --git a/pkg/ioutil/ioutil.go b/pkg/ioutil/ioutil.go index c4feb918b..9094698d3 100644 --- a/pkg/ioutil/ioutil.go +++ b/pkg/ioutil/ioutil.go @@ -20,8 +20,10 @@ package ioutil import ( "io" + "os" humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/pkg/disk" ) // defaultAppendBufferSize - Default buffer size for the AppendFile @@ -157,3 +159,93 @@ func (s *SkipReader) Read(p []byte) (int, error) { func NewSkipReader(r io.Reader, n int64) io.Reader { return &SkipReader{r, n} } + +// DirectIO alignment needs to be 4K. Defined here as +// directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. +const directioAlignSize = 4096 + +// CopyAligned - copies from reader to writer using the aligned input +// buffer, it is expected that input buffer is page aligned to +// 4K page boundaries. Without passing aligned buffer may cause +// this function to return error. +// +// This code is similar in spirit to io.CopyBuffer but it is only to be +// used with DIRECT I/O based file descriptor and it is expected that +// input writer *os.File not a generic io.Writer. Make sure to have +// the file opened for writes with syscall.O_DIRECT flag. +func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte) (int64, error) { + // Writes remaining bytes in the buffer. + writeUnaligned := func(w *os.File, buf []byte) (remainingWritten int, err error) { + var n int + remaining := len(buf) + // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer) + // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode. + remainingAligned := (remaining / directioAlignSize) * directioAlignSize + remainingAlignedBuf := buf[:remainingAligned] + remainingUnalignedBuf := buf[remainingAligned:] + if len(remainingAlignedBuf) > 0 { + n, err = w.Write(remainingAlignedBuf) + if err != nil { + return remainingWritten, err + } + remainingWritten += n + } + if len(remainingUnalignedBuf) > 0 { + // Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT. + if err = disk.DisableDirectIO(w); err != nil { + return remainingWritten, err + } + n, err = w.Write(remainingUnalignedBuf) + if err != nil { + return remainingWritten, err + } + remainingWritten += n + } + return remainingWritten, nil + } + + var written int64 + var err error + for { + nr, er := r.Read(alignedBuf) + if nr == len(alignedBuf) { + // Buffer read is aligned with input buffer, proceed to write. + nw, ew := w.Write(alignedBuf) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } else if nr > 0 { + // Buffer read is not aligned with input buffer, proceed to write + // whatever possible as aligned and turn off direct I/O. + nw, ew := writeUnaligned(w, alignedBuf[:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + // For any read errors break out and return error. + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + + return written, err +}