|
|
|
@ -55,8 +55,8 @@ import ( |
|
|
|
|
const ( |
|
|
|
|
nullVersionID = "null" |
|
|
|
|
diskMinTotalSpace = 900 * humanize.MiByte // Min 900MiB total space.
|
|
|
|
|
writeBlockSize = 4 * humanize.MiByte // Default write block size 4MiB.
|
|
|
|
|
readBlockSize = 2 * humanize.MiByte // Default read block size 2MiB.
|
|
|
|
|
blockSizeLarge = 1 * humanize.MiByte // Default r/w block size for larger objects.
|
|
|
|
|
blockSizeSmall = 128 * humanize.KiByte // Default r/w block size for smaller objects.
|
|
|
|
|
|
|
|
|
|
// On regular files bigger than this;
|
|
|
|
|
readAheadSize = 16 << 20 |
|
|
|
@ -65,9 +65,10 @@ const ( |
|
|
|
|
// Size of each buffer.
|
|
|
|
|
readAheadBufSize = 1 << 20 |
|
|
|
|
|
|
|
|
|
// Small file threshold below which data accompanies metadata
|
|
|
|
|
// from storage layer.
|
|
|
|
|
smallFileThreshold = 32 * humanize.KiByte |
|
|
|
|
// Small file threshold below which data accompanies metadata from storage layer.
|
|
|
|
|
smallFileThreshold = 128 * humanize.KiByte // Optimized for NVMe/SSDs
|
|
|
|
|
// For hardrives it is possible to set this to a lower value to avoid any
|
|
|
|
|
// spike in latency. But currently we are simply keeping it optimal for SSDs.
|
|
|
|
|
|
|
|
|
|
// XL metadata file carries per object metadata.
|
|
|
|
|
xlStorageFormatFile = "xl.meta" |
|
|
|
@ -97,8 +98,8 @@ type xlStorage struct { |
|
|
|
|
|
|
|
|
|
globalSync bool |
|
|
|
|
|
|
|
|
|
wpool sync.Pool |
|
|
|
|
rpool sync.Pool |
|
|
|
|
poolLarge sync.Pool |
|
|
|
|
poolSmall sync.Pool |
|
|
|
|
|
|
|
|
|
rootDisk bool |
|
|
|
|
|
|
|
|
@ -261,15 +262,15 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { |
|
|
|
|
p := &xlStorage{ |
|
|
|
|
diskPath: path, |
|
|
|
|
endpoint: ep, |
|
|
|
|
wpool: sync.Pool{ |
|
|
|
|
poolLarge: sync.Pool{ |
|
|
|
|
New: func() interface{} { |
|
|
|
|
b := disk.AlignedBlock(writeBlockSize) |
|
|
|
|
b := disk.AlignedBlock(blockSizeLarge) |
|
|
|
|
return &b |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
rpool: sync.Pool{ |
|
|
|
|
poolSmall: sync.Pool{ |
|
|
|
|
New: func() interface{} { |
|
|
|
|
b := disk.AlignedBlock(readBlockSize) |
|
|
|
|
b := disk.AlignedBlock(blockSizeSmall) |
|
|
|
|
return &b |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
@ -1180,7 +1181,7 @@ func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
atomic.AddInt32(&s.activeIOCount, 1) |
|
|
|
|
rd := &odirectReader{f, nil, nil, true, s, nil} |
|
|
|
|
rd := &odirectReader{f, nil, nil, true, true, s, nil} |
|
|
|
|
defer rd.Close() |
|
|
|
|
|
|
|
|
|
return ioutil.ReadAll(rd) |
|
|
|
@ -1410,6 +1411,7 @@ type odirectReader struct { |
|
|
|
|
buf []byte |
|
|
|
|
bufp *[]byte |
|
|
|
|
freshRead bool |
|
|
|
|
smallFile bool |
|
|
|
|
s *xlStorage |
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
@ -1420,7 +1422,11 @@ func (o *odirectReader) Read(buf []byte) (n int, err error) { |
|
|
|
|
return 0, o.err |
|
|
|
|
} |
|
|
|
|
if o.buf == nil { |
|
|
|
|
o.bufp = o.s.rpool.Get().(*[]byte) |
|
|
|
|
if o.smallFile { |
|
|
|
|
o.bufp = o.s.poolSmall.Get().(*[]byte) |
|
|
|
|
} else { |
|
|
|
|
o.bufp = o.s.poolLarge.Get().(*[]byte) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if o.freshRead { |
|
|
|
|
o.buf = *o.bufp |
|
|
|
@ -1449,8 +1455,14 @@ func (o *odirectReader) Read(buf []byte) (n int, err error) { |
|
|
|
|
|
|
|
|
|
// Close - Release the buffer and close the file.
|
|
|
|
|
func (o *odirectReader) Close() error { |
|
|
|
|
o.s.rpool.Put(o.bufp) |
|
|
|
|
atomic.AddInt32(&o.s.activeIOCount, -1) |
|
|
|
|
if o.smallFile { |
|
|
|
|
o.s.poolSmall.Put(o.bufp) |
|
|
|
|
} else { |
|
|
|
|
o.s.poolLarge.Put(o.bufp) |
|
|
|
|
} |
|
|
|
|
defer func() { |
|
|
|
|
atomic.AddInt32(&o.s.activeIOCount, -1) |
|
|
|
|
}() |
|
|
|
|
return o.f.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -1472,6 +1484,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var file *os.File |
|
|
|
|
// O_DIRECT only supported if offset is zero
|
|
|
|
|
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { |
|
|
|
|
file, err = disk.OpenFileDirectIO(filePath, os.O_RDONLY, 0666) |
|
|
|
|
} else { |
|
|
|
@ -1512,7 +1525,10 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off |
|
|
|
|
|
|
|
|
|
atomic.AddInt32(&s.activeIOCount, 1) |
|
|
|
|
if offset == 0 && globalStorageClass.GetDMA() == storageclass.DMAReadWrite { |
|
|
|
|
return &odirectReader{file, nil, nil, true, s, nil}, nil |
|
|
|
|
if length <= smallFileThreshold { |
|
|
|
|
return &odirectReader{file, nil, nil, true, true, s, nil}, nil |
|
|
|
|
} |
|
|
|
|
return &odirectReader{file, nil, nil, true, false, s, nil}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if offset > 0 { |
|
|
|
@ -1525,7 +1541,9 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off |
|
|
|
|
io.Reader |
|
|
|
|
io.Closer |
|
|
|
|
}{Reader: io.LimitReader(file, length), Closer: closeWrapper(func() error { |
|
|
|
|
atomic.AddInt32(&s.activeIOCount, -1) |
|
|
|
|
defer func() { |
|
|
|
|
atomic.AddInt32(&s.activeIOCount, -1) |
|
|
|
|
}() |
|
|
|
|
return file.Close() |
|
|
|
|
})} |
|
|
|
|
|
|
|
|
@ -1646,8 +1664,14 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz |
|
|
|
|
w.Close() |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
bufp := s.wpool.Get().(*[]byte) |
|
|
|
|
defer s.wpool.Put(bufp) |
|
|
|
|
var bufp *[]byte |
|
|
|
|
if fileSize <= smallFileThreshold { |
|
|
|
|
bufp = s.poolSmall.Get().(*[]byte) |
|
|
|
|
defer s.poolSmall.Put(bufp) |
|
|
|
|
} else { |
|
|
|
|
bufp = s.poolLarge.Get().(*[]byte) |
|
|
|
|
defer s.poolLarge.Put(bufp) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
written, err := xioutil.CopyAligned(w, r, *bufp, fileSize) |
|
|
|
|
if err != nil { |
|
|
|
|