@ -53,10 +53,9 @@ import (
)
const (
nullVersionID = "null"
diskMinTotalSpace = 900 * humanize . MiByte // Min 900MiB total space.
blockSizeLarge = 1 * humanize . MiByte // Default r/w block size for larger objects.
blockSizeSmall = 128 * humanize . KiByte // Default r/w block size for smaller objects.
nullVersionID = "null"
blockSizeLarge = 1 * humanize . MiByte // Default r/w block size for larger objects.
blockSizeSmall = 128 * humanize . KiByte // Default r/w block size for smaller objects.
// On regular files bigger than this;
readAheadSize = 16 << 20
@ -103,6 +102,8 @@ type xlStorage struct {
rootDisk bool
readODirectSupported bool
diskID string
formatFileInfo os . FileInfo
@ -155,7 +156,7 @@ func checkPathLength(pathName string) error {
return nil
}
func getValidPath ( path string , requireDirectIO bool ) ( string , error ) {
func getValidPath ( path string ) ( string , error ) {
if path == "" {
return path , errInvalidArgument
}
@ -173,7 +174,7 @@ func getValidPath(path string, requireDirectIO bool) (string, error) {
}
if osIsNotExist ( err ) {
// Disk not found create it.
if err = os . MkdirAll ( path , 0777 ) ; err != nil {
if err = reliable MkdirAll( path , 0777 ) ; err != nil {
return path , err
}
}
@ -181,42 +182,6 @@ func getValidPath(path string, requireDirectIO bool) (string, error) {
return path , errDiskNotDir
}
// check if backend is writable.
var rnd [ 8 ] byte
_ , _ = rand . Read ( rnd [ : ] )
fn := pathJoin ( path , ".writable-check-" + hex . EncodeToString ( rnd [ : ] ) + ".tmp" )
defer os . Remove ( fn )
var file * os . File
if requireDirectIO {
// only erasure coding needs direct-io support
file , err = disk . OpenFileDirectIO ( fn , os . O_CREATE | os . O_EXCL , 0666 )
} else {
file , err = os . OpenFile ( fn , os . O_CREATE | os . O_EXCL , 0666 )
}
// open file in direct I/O and use default umask, this also verifies
// if direct i/o failed.
if err != nil {
if isSysErrInvalidArg ( err ) {
// O_DIRECT not supported
return path , errUnsupportedDisk
}
return path , osErrToFileErr ( err )
}
file . Close ( )
di , err := getDiskInfo ( path )
if err != nil {
return path , err
}
if err = checkDiskMinTotal ( di ) ; err != nil {
return path , err
}
return path , nil
}
@ -245,7 +210,7 @@ func newLocalXLStorage(path string) (*xlStorage, error) {
func newXLStorage ( ep Endpoint ) ( * xlStorage , error ) {
path := ep . Path
var err error
if path , err = getValidPath ( path , true ) ; err != nil {
if path , err = getValidPath ( path ) ; err != nil {
return nil , err
}
@ -279,6 +244,35 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
rootDisk : rootDisk ,
}
// Create all necessary bucket folders if possible.
if err = p . MakeVolBulk ( context . TODO ( ) , minioMetaBucket , minioMetaTmpBucket , minioMetaMultipartBucket , dataUsageBucket ) ; err != nil {
return nil , err
}
// Check if backend is writable and supports O_DIRECT
var rnd [ 8 ] byte
_ , _ = rand . Read ( rnd [ : ] )
tmpFile := ".writable-check-" + hex . EncodeToString ( rnd [ : ] ) + ".tmp"
if err = p . CreateFile ( GlobalContext , minioMetaTmpBucket , tmpFile , 1 , strings . NewReader ( "0" ) ) ; err != nil {
return p , err
}
defer os . Remove ( pathJoin ( p . diskPath , minioMetaTmpBucket , tmpFile ) )
volumeDir , err := p . getVolDir ( minioMetaTmpBucket )
if err != nil {
return p , err
}
// Check if backend is readable, and optionally supports O_DIRECT.
if _ , err = p . readAllData ( volumeDir , pathJoin ( volumeDir , tmpFile ) , true ) ; err != nil {
if err != errUnsupportedDisk {
return p , err
}
// error is unsupported disk, turn-off directIO for reads
logger . Info ( fmt . Sprintf ( "Drive %s does not support O_DIRECT for reads, proceeding to use the drive without O_DIRECT" , ep ) )
p . readODirectSupported = false
}
// Success.
return p , nil
}
@ -301,17 +295,6 @@ func getDiskInfo(diskPath string) (di disk.Info, err error) {
return di , err
}
// check if disk total has minimum required size.
func checkDiskMinTotal ( di disk . Info ) ( err error ) {
// Remove 5% from total space for cumulative disk space
// used for journalling, inodes etc.
totalDiskSpace := float64 ( di . Total ) * diskFillFraction
if int64 ( totalDiskSpace ) <= diskMinTotalSpace {
return errMinDiskSize
}
return nil
}
// Implements stringer compatible interface.
func ( s * xlStorage ) String ( ) string {
return s . diskPath
@ -583,11 +566,11 @@ func (s *xlStorage) SetDiskID(id string) {
// storage rest server for remote disks.
}
func ( s * xlStorage ) MakeVolBulk ( ctx context . Context , volumes ... string ) ( err error ) {
func ( s * xlStorage ) MakeVolBulk ( ctx context . Context , volumes ... string ) error {
for _ , volume := range volumes {
if err = s . MakeVol ( ctx , volume ) ; err != nil {
if osIsPermission ( err ) {
return errVolume AccessDenied
if err : = s . MakeVol ( ctx , volume ) ; err != nil {
if errors . Is ( err , errDiskAccessDenied ) {
return errDisk AccessDenied
}
}
}
@ -595,7 +578,7 @@ func (s *xlStorage) MakeVolBulk(ctx context.Context, volumes ...string) (err err
}
// Make a volume entry.
func ( s * xlStorage ) MakeVol ( ctx context . Context , volume string ) ( err error ) {
func ( s * xlStorage ) MakeVol ( ctx context . Context , volume string ) error {
if ! isValidVolname ( volume ) {
return errInvalidArgument
}
@ -614,7 +597,7 @@ func (s *xlStorage) MakeVol(ctx context.Context, volume string) (err error) {
// Volume does not exist we proceed to create.
if osIsNotExist ( err ) {
// Make a volume entry, with mode 0777 mkdir honors system umask.
err = os . MkdirAll ( volumeDir , 0777 )
err = reliable MkdirAll( volumeDir , 0777 )
}
if osIsPermission ( err ) {
return errDiskAccessDenied
@ -1138,7 +1121,10 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
// - object size lesser than 32KiB
// - object has maximum of 1 parts
if fi . TransitionStatus == "" && fi . DataDir != "" && fi . Size <= smallFileThreshold && len ( fi . Parts ) == 1 {
fi . Data , err = s . readAllData ( volumeDir , pathJoin ( volumeDir , path , fi . DataDir , fmt . Sprintf ( "part.%d" , fi . Parts [ 0 ] . Number ) ) )
// Enable O_DIRECT optionally only if drive supports it.
requireDirectIO := globalStorageClass . GetDMA ( ) == storageclass . DMAReadWrite && s . readODirectSupported
partPath := fmt . Sprintf ( "part.%d" , fi . Parts [ 0 ] . Number )
fi . Data , err = s . readAllData ( volumeDir , pathJoin ( volumeDir , path , fi . DataDir , partPath ) , requireDirectIO )
if err != nil {
return FileInfo { } , err
}
@ -1148,9 +1134,9 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi , nil
}
func ( s * xlStorage ) readAllData ( volumeDir , filePath string ) ( buf [ ] byte , err error ) {
func ( s * xlStorage ) readAllData ( volumeDir , filePath string , requireDirectIO bool ) ( buf [ ] byte , err error ) {
var f * os . File
if globalStorageClass . GetDMA ( ) == storageclass . DMAReadWrite {
if requireDirectIO {
f , err = disk . OpenFileDirectIO ( filePath , os . O_RDONLY , 0666 )
} else {
f , err = os . Open ( filePath )
@ -1176,7 +1162,7 @@ func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err err
} else if isSysErrTooManyFiles ( err ) {
return nil , errTooManyOpenFiles
} else if isSysErrInvalidArg ( err ) {
return nil , errFileNotFound
return nil , errUnsupportedDisk
}
return nil , err
}
@ -1185,7 +1171,11 @@ func (s *xlStorage) readAllData(volumeDir, filePath string) (buf []byte, err err
rd := & odirectReader { f , nil , nil , true , true , s , nil }
defer rd . Close ( )
return ioutil . ReadAll ( rd )
buf , err = ioutil . ReadAll ( rd )
if err != nil {
err = osErrToFileErr ( err )
}
return buf , err
}
// ReadAll reads from r until an error or EOF and returns the data it read.
@ -1233,7 +1223,7 @@ func (s *xlStorage) ReadAll(ctx context.Context, volume string, path string) (bu
} else if isSysErrTooManyFiles ( err ) {
return nil , errTooManyOpenFiles
} else if isSysErrInvalidArg ( err ) {
return nil , errFileNotFound
return nil , errUnsupportedDisk
}
return nil , err
}
@ -1486,7 +1476,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
var file * os . File
// O_DIRECT only supported if offset is zero
if offset == 0 && globalStorageClass . GetDMA ( ) == storageclass . DMAReadWrite {
if offset == 0 && globalStorageClass . GetDMA ( ) == storageclass . DMAReadWrite && s . readODirectSupported {
file , err = disk . OpenFileDirectIO ( filePath , os . O_RDONLY , 0666 )
} else {
// Open the file for reading.
@ -1508,6 +1498,8 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
return nil , errFaultyDisk
case isSysErrTooManyFiles ( err ) :
return nil , errTooManyOpenFiles
case isSysErrInvalidArg ( err ) :
return nil , errUnsupportedDisk
default :
return nil , err
}
@ -1525,7 +1517,7 @@ func (s *xlStorage) ReadFileStream(ctx context.Context, volume, path string, off
}
atomic . AddInt32 ( & s . activeIOCount , 1 )
if offset == 0 && globalStorageClass . GetDMA ( ) == storageclass . DMAReadWrite {
if offset == 0 && globalStorageClass . GetDMA ( ) == storageclass . DMAReadWrite && s . readODirectSupported {
if length <= smallFileThreshold {
return & odirectReader { file , nil , nil , true , true , s , nil } , nil
}
@ -2145,7 +2137,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
legacyDataPath := pathJoin ( dstVolumeDir , dstPath , legacyDataDir )
// legacy data dir means its old content, honor system umask.
if err = os . MkdirAll ( legacyDataPath , 0777 ) ; err != nil {
if err = reliable MkdirAll( legacyDataPath , 0777 ) ; err != nil {
return osErrToFileErr ( err )
}