fix: use buffers only when necessary for io.Copy() (#11229)

Use separate sync.Pool for writes/reads

Avoid passing buffers for io.CopyBuffer()
if the writer or reader implement io.WriteTo or io.ReadFrom
respectively then its useless for sync.Pool to allocate
buffers on its own since that will be completely ignored
by the io.CopyBuffer Go implementation.

Improve this wherever we see this to be optimal.

This allows us to be more efficient on memory usage.
```
   385  // copyBuffer is the actual implementation of Copy and CopyBuffer.
   386  // if buf is nil, one is allocated.
   387  func copyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error) {
   388  	// If the reader has a WriteTo method, use it to do the copy.
   389  	// Avoids an allocation and a copy.
   390  	if wt, ok := src.(WriterTo); ok {
   391  		return wt.WriteTo(dst)
   392  	}
   393  	// Similarly, if the writer has a ReadFrom method, use it to do the copy.
   394  	if rt, ok := dst.(ReaderFrom); ok {
   395  		return rt.ReadFrom(src)
   396  	}
```

From readahead package
```
// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written.
// Any error encountered during the write is also returned.
func (a *reader) WriteTo(w io.Writer) (n int64, err error) {
	if a.err != nil {
		return 0, a.err
	}
	n = 0
	for {
		err = a.fill()
		if err != nil {
			return n, err
		}
		n2, err := w.Write(a.cur.buffer())
		a.cur.inc(n2)
		n += int64(n2)
		if err != nil {
			return n, err
		}
```
master
Harshavardhana 4 years ago committed by GitHub
parent b5d291ea88
commit 76e2713ffe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      buildscripts/race.sh
  2. 23
      cmd/fs-v1-helpers.go
  3. 19
      cmd/fs-v1-helpers_test.go
  4. 8
      cmd/fs-v1-multipart.go
  5. 19
      cmd/fs-v1.go
  6. 2
      cmd/handler-api.go
  7. 3
      cmd/object-api-common.go
  8. 10
      cmd/storage-rest-server.go
  9. 126
      cmd/xl-storage.go
  10. 9
      cmd/xl-storage_test.go
  11. 2
      docs/shared-backend/DESIGN.md
  12. 2
      docs/zh_CN/shared-backend/DESIGN.md
  13. 4
      pkg/ioutil/append-file_nix.go
  14. 4
      pkg/ioutil/append-file_windows.go
  15. 6
      pkg/ioutil/ioutil.go

@ -3,5 +3,5 @@
set -e
for d in $(go list ./... | grep -v browser); do
CGO_ENABLED=1 go test -v -race --timeout 50m "$d"
CGO_ENABLED=1 go test -v -race --timeout 100m "$d"
done

@ -271,8 +271,8 @@ func fsOpenFile(ctx context.Context, readPath string, offset int64) (io.ReadClos
return fr, st.Size(), nil
}
// Creates a file and copies data from incoming reader. Staging buffer is used by io.CopyBuffer.
func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, buf []byte, fallocSize int64) (int64, error) {
// Creates a file and copies data from incoming reader.
func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, fallocSize int64) (int64, error) {
if filePath == "" || reader == nil {
logger.LogIf(ctx, errInvalidArgument)
return 0, errInvalidArgument
@ -317,21 +317,10 @@ func fsCreateFile(ctx context.Context, filePath string, reader io.Reader, buf []
}
}
var bytesWritten int64
if buf != nil {
bytesWritten, err = io.CopyBuffer(writer, reader, buf)
if err != nil {
if err != io.ErrUnexpectedEOF {
logger.LogIf(ctx, err)
}
return 0, err
}
} else {
bytesWritten, err = io.Copy(writer, reader)
if err != nil {
logger.LogIf(ctx, err)
return 0, err
}
bytesWritten, err := io.Copy(writer, reader)
if err != nil {
logger.LogIf(ctx, err)
return 0, err
}
return bytesWritten, nil

@ -75,7 +75,7 @@ func TestFSStats(t *testing.T) {
}
var reader = bytes.NewReader([]byte("Hello, world"))
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, nil, 0); err != nil {
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, 0); err != nil {
t.Fatalf("Unable to create file, %s", err)
}
// Seek back.
@ -85,7 +85,7 @@ func TestFSStats(t *testing.T) {
t.Fatal("Unexpected error", err)
}
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "path/to/success-file"), reader, nil, 0); err != nil {
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "path/to/success-file"), reader, 0); err != nil {
t.Fatalf("Unable to create file, %s", err)
}
// Seek back.
@ -192,7 +192,7 @@ func TestFSCreateAndOpen(t *testing.T) {
t.Fatalf("Unable to create directory, %s", err)
}
if _, err = fsCreateFile(GlobalContext, "", nil, nil, 0); err != errInvalidArgument {
if _, err = fsCreateFile(GlobalContext, "", nil, 0); err != errInvalidArgument {
t.Fatal("Unexpected error", err)
}
@ -201,7 +201,7 @@ func TestFSCreateAndOpen(t *testing.T) {
}
var reader = bytes.NewReader([]byte("Hello, world"))
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, nil, 0); err != nil {
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, 0); err != nil {
t.Fatalf("Unable to create file, %s", err)
}
// Seek back.
@ -229,7 +229,7 @@ func TestFSCreateAndOpen(t *testing.T) {
}
for i, testCase := range testCases {
_, err = fsCreateFile(GlobalContext, pathJoin(path, testCase.srcVol, testCase.srcPath), reader, nil, 0)
_, err = fsCreateFile(GlobalContext, pathJoin(path, testCase.srcVol, testCase.srcPath), reader, 0)
if err != testCase.expectedErr {
t.Errorf("Test case %d: Expected: \"%s\", got: \"%s\"", i+1, testCase.expectedErr, err)
}
@ -258,9 +258,8 @@ func TestFSDeletes(t *testing.T) {
t.Fatalf("Unable to create directory, %s", err)
}
var buf = make([]byte, 4096)
var reader = bytes.NewReader([]byte("Hello, world"))
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, buf, reader.Size()); err != nil {
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, reader.Size()); err != nil {
t.Fatalf("Unable to create file, %s", err)
}
// Seek back.
@ -396,13 +395,13 @@ func TestFSRemoves(t *testing.T) {
}
var reader = bytes.NewReader([]byte("Hello, world"))
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, nil, 0); err != nil {
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file"), reader, 0); err != nil {
t.Fatalf("Unable to create file, %s", err)
}
// Seek back.
reader.Seek(0, 0)
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file-new"), reader, nil, 0); err != nil {
if _, err = fsCreateFile(GlobalContext, pathJoin(path, "success-vol", "success-file-new"), reader, 0); err != nil {
t.Fatalf("Unable to create file, %s", err)
}
// Seek back.
@ -515,7 +514,7 @@ func TestFSRemoveMeta(t *testing.T) {
filePath := pathJoin(fsPath, "success-vol", "success-file")
var reader = bytes.NewReader([]byte("Hello, world"))
if _, err = fsCreateFile(GlobalContext, filePath, reader, nil, 0); err != nil {
if _, err = fsCreateFile(GlobalContext, filePath, reader, 0); err != nil {
t.Fatalf("Unable to create file, %s", err)
}

@ -313,14 +313,8 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
return pi, toObjectErr(err, bucket, object)
}
bufSize := int64(readSizeV1)
if size := data.Size(); size > 0 && bufSize > size {
bufSize = size
}
buf := make([]byte, bufSize)
tmpPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID+"."+mustGetUUID()+"."+strconv.Itoa(partID))
bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, buf, data.Size())
bytesWritten, err := fsCreateFile(ctx, tmpPartPath, data, data.Size())
// Delete temporary part in case of failure. If
// PutObjectPart succeeds then there would be nothing to

@ -857,11 +857,6 @@ func (fs *FSObjects) getObject(ctx context.Context, bucket, object string, offse
}
defer reader.Close()
bufSize := int64(readSizeV1)
if length > 0 && bufSize > length {
bufSize = length
}
// For negative length we read everything.
if length < 0 {
length = size - offset
@ -874,10 +869,7 @@ func (fs *FSObjects) getObject(ctx context.Context, bucket, object string, offse
return err
}
// Allocate a staging buffer.
buf := make([]byte, int(bufSize))
_, err = io.CopyBuffer(writer, io.LimitReader(reader, length), buf)
_, err = io.Copy(writer, io.LimitReader(reader, length))
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
if err == io.ErrClosedPipe {
err = nil
@ -1199,15 +1191,8 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
// so that cleaning it up will be easy if the server goes down.
tempObj := mustGetUUID()
// Allocate a buffer to Read() from request body
bufSize := int64(readSizeV1)
if size := data.Size(); size > 0 && bufSize > size {
bufSize = size
}
buf := make([]byte, int(bufSize))
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, buf, data.Size())
bytesWritten, err := fsCreateFile(ctx, fsTmpObjPath, data, data.Size())
// Delete the temporary object in the case of a
// failure. If PutObject succeeds, then there would be

@ -57,7 +57,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
// max requests per node is calculated as
// total_ram / ram_per_request
// ram_per_request is 4MiB * setDriveCount + 2 * 10MiB (default erasure block size)
apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(setDriveCount*readBlockSize+blockSizeV1*2))
apiRequestsMaxPerNode = int(stats.TotalRAM / uint64(setDriveCount*(writeBlockSize+readBlockSize)+blockSizeV1*2))
} else {
apiRequestsMaxPerNode = cfg.RequestsMax
if len(globalEndpoints.Hostnames()) > 0 {

@ -31,9 +31,6 @@ const (
// Block size used for all internal operations version 1.
blockSizeV1 = 10 * humanize.MiByte
// Staging buffer read size for all internal operations version 1.
readSizeV1 = 1 * humanize.MiByte
// Buckets meta prefix.
bucketMetaPrefix = "buckets"

@ -515,15 +515,7 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
defer rc.Close()
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
if length >= readBlockSize {
bufp := s.storage.pool.Get().(*[]byte)
defer s.storage.pool.Put(bufp)
io.CopyBuffer(w, rc, *bufp)
} else {
io.Copy(w, rc)
}
io.Copy(w, rc)
w.(http.Flusher).Flush()
}

@ -55,7 +55,8 @@ import (
const (
nullVersionID = "null"
diskMinTotalSpace = 900 * humanize.MiByte // Min 900MiB total space.
readBlockSize = 4 * humanize.MiByte // Default read block size 4MiB.
writeBlockSize = 4 * humanize.MiByte // Default write block size 4MiB.
readBlockSize = 2 * humanize.MiByte // Default read block size 2MiB.
// On regular files bigger than this;
readAheadSize = 16 << 20
@ -90,10 +91,11 @@ type xlStorage struct {
diskPath string
endpoint Endpoint
pool sync.Pool
globalSync bool
wpool sync.Pool
rpool sync.Pool
rootDisk bool
diskID string
@ -254,7 +256,13 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
p := &xlStorage{
diskPath: path,
endpoint: ep,
pool: sync.Pool{
wpool: sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(writeBlockSize)
return &b
},
},
rpool: sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(readBlockSize)
return &b
@ -671,12 +679,16 @@ func (s *xlStorage) StatVol(ctx context.Context, volume string) (vol VolInfo, er
var st os.FileInfo
st, err = os.Stat(volumeDir)
if err != nil {
if osIsNotExist(err) {
switch {
case osIsNotExist(err):
return VolInfo{}, errVolumeNotFound
} else if isSysErrIO(err) {
case osIsPermission(err):
return VolInfo{}, errDiskAccessDenied
case isSysErrIO(err):
return VolInfo{}, errFaultyDisk
default:
return VolInfo{}, err
}
return VolInfo{}, err
}
// As os.Stat() doesn't carry other than ModTime(), use ModTime()
// as CreatedTime.
@ -723,58 +735,6 @@ func (s *xlStorage) DeleteVol(ctx context.Context, volume string, forceDelete bo
return nil
}
const guidSplunk = "guidSplunk"
// ListDirSplunk - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing SlashSeparator.
func (s *xlStorage) ListDirSplunk(volume, dirPath string, count int) (entries []string, err error) {
guidIndex := strings.Index(dirPath, guidSplunk)
if guidIndex != -1 {
return nil, nil
}
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
}()
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
if _, err = os.Stat(volumeDir); err != nil {
if osIsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
dirPathAbs := pathJoin(volumeDir, dirPath)
if count > 0 {
entries, err = readDirN(dirPathAbs, count)
} else {
entries, err = readDir(dirPathAbs)
}
if err != nil {
return nil, err
}
return entries, nil
}
func (s *xlStorage) isLeafSplunk(volume string, leafPath string) bool {
const receiptJSON = "receipt.json"
if path.Base(leafPath) != receiptJSON {
return false
}
return s.isLeaf(volume, leafPath)
}
func (s *xlStorage) isLeaf(volume string, leafPath string) bool {
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -903,15 +863,6 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
return nil, err
}
if _, err = os.Stat(volumeDir); err != nil {
if osIsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
dirPathAbs := pathJoin(volumeDir, dirPath)
if count > 0 {
entries, err = readDirN(dirPathAbs, count)
@ -919,6 +870,15 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
entries, err = readDir(dirPathAbs)
}
if err != nil {
if err == errFileNotFound {
if _, verr := os.Stat(volumeDir); verr != nil {
if osIsNotExist(verr) {
return nil, errVolumeNotFound
} else if isSysErrIO(verr) {
return nil, errFaultyDisk
}
}
}
return nil, err
}
@ -1122,8 +1082,6 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
}
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
// for all objects less than `128KiB` this call returns data as well
// along with metadata.
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
if err != nil {
@ -1300,11 +1258,8 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of
return int64(n), err
}
bufp := s.pool.Get().(*[]byte)
defer s.pool.Put(bufp)
h := verifier.algorithm.New()
if _, err = io.CopyBuffer(h, io.LimitReader(file, offset), *bufp); err != nil {
if _, err = io.Copy(h, io.LimitReader(file, offset)); err != nil {
return 0, err
}
@ -1316,7 +1271,7 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of
return 0, err
}
if _, err = io.CopyBuffer(h, file, *bufp); err != nil {
if _, err = io.Copy(h, file); err != nil {
return 0, err
}
@ -1384,7 +1339,7 @@ func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err err
}
// To support O_DIRECT reads for erasure backends.
type odirectreader struct {
type odirectReader struct {
f *os.File
buf []byte
bufp *[]byte
@ -1394,12 +1349,12 @@ type odirectreader struct {
}
// Read - Implements Reader interface.
func (o *odirectreader) Read(buf []byte) (n int, err error) {
func (o *odirectReader) Read(buf []byte) (n int, err error) {
if o.err != nil {
return 0, o.err
}
if o.buf == nil {
o.bufp = o.s.pool.Get().(*[]byte)
o.bufp = o.s.rpool.Get().(*[]byte)
}
if o.freshRead {
o.buf = *o.bufp
@ -1427,8 +1382,8 @@ func (o *odirectreader) Read(buf []byte) (n int, err error) {
}
// Close - Release the buffer and close the file.
func (o *odirectreader) Close() error {
o.s.pool.Put(o.bufp)
func (o *odirectReader) Close() error {
o.s.rpool.Put(o.bufp)
atomic.AddInt32(&o.s.activeIOCount, -1)
return o.f.Close()
}
@ -1474,7 +1429,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
}
atomic.AddInt32(&s.activeIOCount, 1)
return &odirectreader{file, nil, nil, true, s, nil}, nil
return &odirectReader{file, nil, nil, true, s, nil}, nil
}
// Open the file for reading.
@ -1640,8 +1595,8 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
w.Close()
}()
bufp := s.pool.Get().(*[]byte)
defer s.pool.Put(bufp)
bufp := s.wpool.Get().(*[]byte)
defer s.wpool.Put(bufp)
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize)
if err != nil {
@ -2271,11 +2226,8 @@ func (s *xlStorage) bitrotVerify(partPath string, partSize int64, algo BitrotAlg
defer file.Close()
if algo != HighwayHash256S {
bufp := s.pool.Get().(*[]byte)
defer s.pool.Put(bufp)
h := algo.New()
if _, err = io.CopyBuffer(h, file, *bufp); err != nil {
if _, err = io.Copy(h, file); err != nil {
// Premature failure in reading the object,file is corrupt.
return errFileCorrupt
}

@ -703,8 +703,8 @@ func TestXLStorageListVols(t *testing.T) {
}
}
// TestXLStorageXlStorageListDir - TestXLStorages validate the directory listing functionality provided by xlStorage.ListDir .
func TestXLStorageXlStorageListDir(t *testing.T) {
// TestXLStorageListDir - TestXLStorages validate the directory listing functionality provided by xlStorage.ListDir .
func TestXLStorageListDir(t *testing.T) {
// create xlStorage test setup
xlStorage, path, err := newXLStorageTestSetup()
if err != nil {
@ -1692,8 +1692,9 @@ func TestXLStorageVerifyFile(t *testing.T) {
w := newStreamingBitrotWriter(xlStorage, volName, fileName, size, algo, shardSize)
reader := bytes.NewReader(data)
for {
// Using io.CopyBuffer instead of this loop will not work for us as io.CopyBuffer
// will use bytes.Buffer.ReadConfig() which will not do shardSize'ed writes causing error.
// Using io.Copy instead of this loop will not work for us as io.Copy
// will use bytes.Reader.WriteTo() which will not do shardSize'ed writes
// causing error.
n, err := reader.Read(shard)
w.Write(shard[:n])
if err == nil {

@ -90,7 +90,7 @@ GetObject() holds a read lock on `fs.json`.
... you can perform other operations here ...
_, err = io.CopyBuffer(writer, reader, buf)
_, err = io.Copy(writer, reader)
... after successful copy operation unlocks the read lock ...
```

@ -94,7 +94,7 @@ GetObject()持有`fs.json`的一个读锁。
... you can perform other operations here ...
_, err = io.CopyBuffer(writer, reader, buf)
_, err = io.Copy(writer, reader)
... after successful copy operation unlocks the read lock ...
```

@ -40,8 +40,6 @@ func AppendFile(dst string, src string, osync bool) error {
return err
}
defer srcFile.Close()
// Allocate staging buffer.
var buf = make([]byte, defaultAppendBufferSize)
_, err = io.CopyBuffer(appendFile, srcFile, buf)
_, err = io.Copy(appendFile, srcFile)
return err
}

@ -36,8 +36,6 @@ func AppendFile(dst string, src string, osync bool) error {
return err
}
defer srcFile.Close()
// Allocate staging buffer.
var buf = make([]byte, defaultAppendBufferSize)
_, err = io.CopyBuffer(appendFile, srcFile, buf)
_, err = io.Copy(appendFile, srcFile)
return err
}

@ -22,13 +22,9 @@ import (
"io"
"os"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/pkg/disk"
)
// defaultAppendBufferSize - Default buffer size for the AppendFile
const defaultAppendBufferSize = humanize.MiByte
// WriteOnCloser implements io.WriteCloser and always
// executes at least one write operation if it is closed.
//
@ -186,7 +182,7 @@ const directioAlignSize = 4096
// 4K page boundaries. Without passing aligned buffer may cause
// this function to return error.
//
// This code is similar in spirit to io.CopyBuffer but it is only to be
// This code is similar in spirit to io.Copy but it is only to be
// used with DIRECT I/O based file descriptor and it is expected that
// input writer *os.File not a generic io.Writer. Make sure to have
// the file opened for writes with syscall.O_DIRECT flag.

Loading…
Cancel
Save