From 39b3e4f9b3587f7e8ee735eb61307e440a202921 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 22 May 2019 13:47:15 -0700 Subject: [PATCH] Avoid using io.ReadFull() for WriteAll and CreateFile (#7676) With these changes we are now able to peak performances for all Write() operations across disks HDD and NVMe. Also adds readahead for disk reads, which also increases performance for reads by 3x. --- cmd/naughty-disk_test.go | 4 +- cmd/posix.go | 111 ++++++++++--------------------------- cmd/storage-interface.go | 2 +- cmd/storage-rest-client.go | 3 +- cmd/storage-rest-server.go | 9 +-- cmd/xl-v1-metadata.go | 3 +- go.mod | 1 + go.sum | 2 + pkg/ioutil/ioutil.go | 92 ++++++++++++++++++++++++++++++ 9 files changed, 132 insertions(+), 95 deletions(-) diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 4a48c9791..2a6f1f1cc 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -182,11 +182,11 @@ func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, er return errs, nil } -func (d *naughtyDisk) WriteAll(volume string, path string, buf []byte) (err error) { +func (d *naughtyDisk) WriteAll(volume string, path string, reader io.Reader) (err error) { if err := d.calcError(); err != nil { return err } - return d.disk.WriteAll(volume, path, buf) + return d.disk.WriteAll(volume, path, reader) } func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error) { diff --git a/cmd/posix.go b/cmd/posix.go index 6fe37f8f4..78ae3afce 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -35,20 +35,19 @@ import ( "bytes" humanize "github.com/dustin/go-humanize" + "github.com/klauspost/readahead" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/disk" + xioutil "github.com/minio/minio/pkg/ioutil" "github.com/minio/minio/pkg/mountinfo" "github.com/ncw/directio" ) const ( - diskMinFreeSpace = 900 * humanize.MiByte // Min 900MiB free space. - diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space. - maxAllowedIOError = 5 - posixWriteBlockSize = 4 * humanize.MiByte - // DirectIO alignment needs to be 4K. Defined here as - // directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. - directioAlignSize = 4096 + diskMinFreeSpace = 900 * humanize.MiByte // Min 900MiB free space. + diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space. + maxAllowedIOError = 5 + readBlockSize = humanize.KiByte * 32 // Default read block size 32KiB. ) // isValidVolname verifies a volname name in accordance with object @@ -190,10 +189,9 @@ func newPosix(path string) (*posix, error) { p := &posix{ connected: true, diskPath: path, - // 4MiB buffer pool for posix internal operations. pool: sync.Pool{ New: func() interface{} { - b := directio.AlignedBlock(posixWriteBlockSize) + b := directio.AlignedBlock(readBlockSize) return &b }, }, @@ -1110,10 +1108,13 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re if _, err = file.Seek(offset, io.SeekStart); err != nil { return nil, err } - return struct { + + r := struct { io.Reader io.Closer - }{Reader: io.LimitReader(file, length), Closer: file}, nil + }{Reader: io.LimitReader(file, length), Closer: file} + + return readahead.NewReadCloser(r), nil } // CreateFile - creates the file. @@ -1182,10 +1183,6 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er return err } } - defer func() { - w.Sync() // Sync before close. - w.Close() - }() var e error if fileSize > 0 { @@ -1208,76 +1205,26 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er return err } + defer w.Close() + bufp := s.pool.Get().(*[]byte) defer s.pool.Put(bufp) - buf := *bufp - - // Writes remaining bytes in the buffer. - writeRemaining := func(w *os.File, buf []byte) (remainingWritten int, err error) { - var n int - remaining := len(buf) - // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer) - // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode. - remainingAligned := (remaining / directioAlignSize) * directioAlignSize - remainingAlignedBuf := buf[:remainingAligned] - remainingUnalignedBuf := buf[remainingAligned:] - if len(remainingAlignedBuf) > 0 { - n, err = w.Write(remainingAlignedBuf) - if err != nil { - return 0, err - } - remainingWritten += n - } - if len(remainingUnalignedBuf) > 0 { - // Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT. - if err = disk.DisableDirectIO(w); err != nil { - return 0, err - } - n, err = w.Write(remainingUnalignedBuf) - if err != nil { - return 0, err - } - remainingWritten += n - } - return remainingWritten, nil + written, err := xioutil.CopyAligned(w, r, *bufp) + if err != nil { + return err } - var written int - for { - var n int - n, err = io.ReadFull(r, buf) - switch err { - case nil: - n, err = w.Write(buf) - if err != nil { - return err - } - written += n - case io.ErrUnexpectedEOF: - n, err = writeRemaining(w, buf[:n]) - if err != nil { - return err - } - written += n - fallthrough - case io.EOF: - if fileSize != -1 { - if written < int(fileSize) { - return errLessData - } - if written > int(fileSize) { - return errMoreData - } - } - return nil - default: - return err - } + if written < fileSize { + return errLessData + } else if written > fileSize { + return errMoreData } + + return nil } -func (s *posix) WriteAll(volume, path string, buf []byte) (err error) { +func (s *posix) WriteAll(volume, path string, reader io.Reader) (err error) { defer func() { if err == errFaultyDisk { atomic.AddInt32(&s.ioErrCount, 1) @@ -1295,11 +1242,13 @@ func (s *posix) WriteAll(volume, path string, buf []byte) (err error) { return err } - if _, err = w.Write(buf); err != nil { - return err - } + defer w.Close() - return w.Close() + bufp := s.pool.Get().(*[]byte) + defer s.pool.Put(bufp) + + _, err = io.CopyBuffer(w, reader, *bufp) + return err } // AppendFile - append a byte array at path, if file doesn't exist at diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 0502c6952..74197f3de 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -54,7 +54,7 @@ type StorageAPI interface { DeleteFileBulk(volume string, paths []string) (errs []error, err error) // Write all data, syncs the data to disk. - WriteAll(volume string, path string, buf []byte) (err error) + WriteAll(volume string, path string, reader io.Reader) (err error) // Read all. ReadAll(volume string, path string) (buf []byte, err error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index ede29de94..dbb98f500 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -256,11 +256,10 @@ func (client *storageRESTClient) CreateFile(volume, path string, length int64, r } // WriteAll - write all data to a file. -func (client *storageRESTClient) WriteAll(volume, path string, buffer []byte) error { +func (client *storageRESTClient) WriteAll(volume, path string, reader io.Reader) error { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) - reader := bytes.NewBuffer(buffer) respBody, err := client.call(storageRESTMethodWriteAll, values, reader, -1) defer http.DrainBody(respBody) return err diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 797c34875..1ecc79987 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -245,14 +245,7 @@ func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Reque return } - buf := make([]byte, r.ContentLength) - _, err := io.ReadFull(r.Body, buf) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - err = s.storage.WriteAll(volume, filePath, buf) + err := s.storage.WriteAll(volume, filePath, io.LimitReader(r.Body, r.ContentLength)) if err != nil { s.writeErrorResponse(w, err) } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 734e20718..810c85056 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "context" "crypto/sha256" "encoding/hex" @@ -437,7 +438,7 @@ func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string } // Persist marshaled data. - err = disk.WriteAll(bucket, jsonFile, metadataBytes) + err = disk.WriteAll(bucket, jsonFile, bytes.NewReader(metadataBytes)) logger.LogIf(ctx, err) return err } diff --git a/go.mod b/go.mod index 2e569502b..8a49b86af 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/klauspost/compress v1.4.1 // indirect github.com/klauspost/cpuid v1.2.0 // indirect github.com/klauspost/pgzip v1.2.1 + github.com/klauspost/readahead v1.3.0 github.com/klauspost/reedsolomon v1.9.1 github.com/lib/pq v1.0.0 github.com/mattn/go-isatty v0.0.7 diff --git a/go.sum b/go.sum index f9d0d5515..cf10cdb4c 100644 --- a/go.sum +++ b/go.sum @@ -337,6 +337,8 @@ github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPR github.com/klauspost/pgzip v0.0.0-20180606150939-90b2c57fba35/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.1 h1:oIPZROsWuPHpOdMVWLuJZXwgjhrW8r1yEX8UqMyeNHM= github.com/klauspost/pgzip v1.2.1/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/klauspost/readahead v1.3.0 h1:ur57scQa1RS6oQgdq+6mylmP2u0iR1LFw1zy3Xwqacg= +github.com/klauspost/readahead v1.3.0/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= github.com/klauspost/reedsolomon v0.0.0-20190210214925-2b210cf0866d/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/klauspost/reedsolomon v1.9.1 h1:kYrT1MlR4JH6PqOpC+okdb9CDTcwEC/BqpzK4WFyXL8= github.com/klauspost/reedsolomon v1.9.1/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= diff --git a/pkg/ioutil/ioutil.go b/pkg/ioutil/ioutil.go index c4feb918b..9094698d3 100644 --- a/pkg/ioutil/ioutil.go +++ b/pkg/ioutil/ioutil.go @@ -20,8 +20,10 @@ package ioutil import ( "io" + "os" humanize "github.com/dustin/go-humanize" + "github.com/minio/minio/pkg/disk" ) // defaultAppendBufferSize - Default buffer size for the AppendFile @@ -157,3 +159,93 @@ func (s *SkipReader) Read(p []byte) (int, error) { func NewSkipReader(r io.Reader, n int64) io.Reader { return &SkipReader{r, n} } + +// DirectIO alignment needs to be 4K. Defined here as +// directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. +const directioAlignSize = 4096 + +// CopyAligned - copies from reader to writer using the aligned input +// buffer, it is expected that input buffer is page aligned to +// 4K page boundaries. Without passing aligned buffer may cause +// this function to return error. +// +// This code is similar in spirit to io.CopyBuffer but it is only to be +// used with DIRECT I/O based file descriptor and it is expected that +// input writer *os.File not a generic io.Writer. Make sure to have +// the file opened for writes with syscall.O_DIRECT flag. +func CopyAligned(w *os.File, r io.Reader, alignedBuf []byte) (int64, error) { + // Writes remaining bytes in the buffer. + writeUnaligned := func(w *os.File, buf []byte) (remainingWritten int, err error) { + var n int + remaining := len(buf) + // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer) + // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode. + remainingAligned := (remaining / directioAlignSize) * directioAlignSize + remainingAlignedBuf := buf[:remainingAligned] + remainingUnalignedBuf := buf[remainingAligned:] + if len(remainingAlignedBuf) > 0 { + n, err = w.Write(remainingAlignedBuf) + if err != nil { + return remainingWritten, err + } + remainingWritten += n + } + if len(remainingUnalignedBuf) > 0 { + // Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT. + if err = disk.DisableDirectIO(w); err != nil { + return remainingWritten, err + } + n, err = w.Write(remainingUnalignedBuf) + if err != nil { + return remainingWritten, err + } + remainingWritten += n + } + return remainingWritten, nil + } + + var written int64 + var err error + for { + nr, er := r.Read(alignedBuf) + if nr == len(alignedBuf) { + // Buffer read is aligned with input buffer, proceed to write. + nw, ew := w.Write(alignedBuf) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } else if nr > 0 { + // Buffer read is not aligned with input buffer, proceed to write + // whatever possible as aligned and turn off direct I/O. + nw, ew := writeUnaligned(w, alignedBuf[:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + // For any read errors break out and return error. + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + + return written, err +}