XL/fs: Optimize calling isBucketExist() (#1656)

* posix: Avoid using getAllVolumeInfo() in getVolumeDir()

This is necessary compromise to avoid significant slowness this
causes under load. The compromise is also substantial in a way
so that to avoid penalizing common cases v/s special cases.

For buckets with Caps on Unixes, we filter buckets based on the
latest anyways, so this is completely acceptable.

* XL/fs: Change the usage of verification of existance of buckets.

Optimize calling isBucketExists, it is not needed for all call
paths. isBucketExist should be called only for calls which use
temporary volume location for operations, for the rest rely on
the errors returned on their original call path.

Remove usage of filtering as well across all volume names.
master
Harshavardhana 9 years ago committed by Anand Babu (AB) Periasamy
parent 4214da65af
commit 4bc923e63b
  1. 13
      fs-objects.go
  2. 3
      object-common-multipart.go
  3. 6
      object-common.go
  4. 146
      posix.go
  5. 23
      xl-erasure-v1-common.go
  6. 6
      xl-erasure-v1-createfile.go
  7. 38
      xl-erasure-v1.go
  8. 12
      xl-objects.go

@ -94,9 +94,6 @@ func (fs fsObjects) GetObject(bucket, object string, startOffset int64) (io.Read
if !IsValidBucketName(bucket) {
return nil, BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(fs.storage, bucket) {
return nil, BucketNotFound{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return nil, ObjectNameInvalid{Bucket: bucket, Object: object}
@ -114,9 +111,6 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if !IsValidBucketName(bucket) {
return ObjectInfo{}, (BucketNameInvalid{Bucket: bucket})
}
if !isBucketExist(fs.storage, bucket) {
return ObjectInfo{}, BucketNotFound{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object})
@ -149,10 +143,6 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(fs.storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{
Bucket: bucket,
@ -219,9 +209,6 @@ func (fs fsObjects) DeleteObject(bucket, object string) error {
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(fs.storage, bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}

@ -80,7 +80,6 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string)
if !isBucketExist(storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
// Verify if object name is valid.
if !IsValidObjectName(object) {
return "", ObjectNameInvalid{Bucket: bucket, Object: object}
@ -229,7 +228,6 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
if !isBucketExist(storage, bucket) {
return BucketNotFound{Bucket: bucket}
}
@ -371,7 +369,6 @@ func listMultipartUploadsCommon(layer ObjectLayer, bucket, prefix, keyMarker, up
case fsObjects:
storage = l.storage
}
result := ListMultipartsInfo{}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {

@ -144,17 +144,14 @@ func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter stri
case fsObjects:
storage = l.storage
}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Verify whether the bucket exists.
// Verify if bucket exists.
if !isBucketExist(storage, bucket) {
return ListObjectsInfo{}, BucketNotFound{Bucket: bucket}
}
if !IsValidObjectPrefix(prefix) {
return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix}
}
@ -174,6 +171,7 @@ func listObjectsCommon(layer ObjectLayer, bucket, prefix, marker, delimiter stri
}
}
// With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return ListObjectsInfo{}, nil
}

@ -128,28 +128,8 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
return nil
}
func removeDuplicateVols(volsInfo []VolInfo) []VolInfo {
// Use map to record duplicates as we find them.
result := []VolInfo{}
m := make(map[string]VolInfo)
for _, v := range volsInfo {
if _, found := m[v.Name]; !found {
m[v.Name] = v
}
}
result = make([]VolInfo, 0, len(m))
for _, v := range m {
result = append(result, v)
}
// Return the new slice.
return result
}
// gets all the unique directories from diskPath.
func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
// List all the volumes from diskPath.
func listVols(dirPath string) ([]VolInfo, error) {
if err := checkPathLength(dirPath); err != nil {
return nil, err
}
@ -179,14 +159,14 @@ func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
Created: fi.ModTime(),
})
}
return removeDuplicateVols(volsInfo), nil
return volsInfo, nil
}
// getVolumeDir - will convert incoming volume names to
// getVolDir - will convert incoming volume names to
// corresponding valid volume names on the backend in a platform
// compatible way for all operating systems. If volume is not found
// an error is generated.
func (s fsStorage) getVolumeDir(volume string) (string, error) {
func (s fsStorage) getVolDir(volume string) (string, error) {
if !isValidVolname(volume) {
return "", errInvalidArgument
}
@ -194,30 +174,7 @@ func (s fsStorage) getVolumeDir(volume string) (string, error) {
return "", err
}
volumeDir := pathJoin(s.diskPath, volume)
_, err := os.Stat(volumeDir)
if err == nil {
return volumeDir, nil
}
if os.IsNotExist(err) {
var volsInfo []VolInfo
volsInfo, err = getAllUniqueVols(s.diskPath)
if err != nil {
// For any errors, treat it as disk not found.
return volumeDir, err
}
for _, vol := range volsInfo {
// Verify if lowercase version of the volume is equal to
// the incoming volume, then use the proper name.
if strings.ToLower(vol.Name) == volume {
volumeDir = pathJoin(s.diskPath, vol.Name)
return volumeDir, nil
}
}
return volumeDir, errVolumeNotFound
} else if os.IsPermission(err) {
return volumeDir, errVolumeAccessDenied
}
return volumeDir, err
return volumeDir, nil
}
// Make a volume entry.
@ -227,20 +184,17 @@ func (s fsStorage) MakeVol(volume string) (err error) {
return err
}
volumeDir, err := s.getVolumeDir(volume)
if err == nil {
// Volume already exists, return error.
return errVolumeExists
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
// If volume not found create it.
if err == errVolumeNotFound {
// Make a volume entry.
return os.Mkdir(volumeDir, 0700)
// Make a volume entry.
err = os.Mkdir(volumeDir, 0700)
if err != nil && os.IsExist(err) {
return errVolumeExists
}
// For all other errors return here.
return err
// Success
return nil
}
// ListVols - list volumes.
@ -251,7 +205,7 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
if err != nil {
return nil, err
}
volsInfo, err = getAllUniqueVols(s.diskPath)
volsInfo, err = listVols(s.diskPath)
if err != nil {
return nil, err
}
@ -275,7 +229,7 @@ func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
// StatVol - get volume info.
func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return VolInfo{}, err
}
@ -309,7 +263,7 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
// DeleteVol - delete a volume.
func (s fsStorage) DeleteVol(volume string) error {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
@ -321,8 +275,7 @@ func (s fsStorage) DeleteVol(volume string) error {
// On windows the string is slightly different, handle it here.
return errVolumeNotEmpty
} else if strings.Contains(err.Error(), "directory not empty") {
// Hopefully for all other
// operating systems, this is
// Hopefully for all other operating systems, this is
// assumed to be consistent.
return errVolumeNotEmpty
}
@ -335,7 +288,7 @@ func (s fsStorage) DeleteVol(volume string) error {
// If an entry is a directory it will be returned with a trailing "/".
func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
@ -352,8 +305,16 @@ func (s fsStorage) ListDir(volume, dirPath string) ([]string, error) {
// ReadFile - read a file at a given offset.
func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) {
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
}
return nil, err
}
@ -388,10 +349,18 @@ func (s fsStorage) ReadFile(volume string, path string, offset int64) (readClose
// CreateFile - create a file at path.
func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
}
return nil, err
}
if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil {
return nil, err
}
@ -419,8 +388,16 @@ func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser,
// StatFile - get file info.
func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) {
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return FileInfo{}, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return FileInfo{}, errVolumeNotFound
}
return FileInfo{}, err
}
@ -489,8 +466,16 @@ func deleteFile(basePath, deletePath string) error {
// DeleteFile - delete a file at path.
func (s fsStorage) DeleteFile(volume, path string) error {
volumeDir, err := s.getVolumeDir(volume)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
}
return err
}
@ -507,14 +492,29 @@ func (s fsStorage) DeleteFile(volume, path string) error {
// RenameFile - rename file.
func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
srcVolumeDir, err := s.getVolumeDir(srcVolume)
srcVolumeDir, err := s.getVolDir(srcVolume)
if err != nil {
return err
}
dstVolumeDir, err := s.getVolDir(dstVolume)
if err != nil {
return err
}
dstVolumeDir, err := s.getVolumeDir(dstVolume)
// Stat a volume entry.
_, err = os.Stat(srcVolumeDir)
if err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
}
return err
}
_, err = os.Stat(dstVolumeDir)
if err != nil {
if os.IsNotExist(err) {
return errVolumeNotFound
}
}
srcIsDir := strings.HasSuffix(srcPath, slashSeparator)
dstIsDir := strings.HasSuffix(dstPath, slashSeparator)
// Either src and dst have to be directories or files, else return error.

@ -46,25 +46,32 @@ func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64)
return versions
}
// errsToStorageErr - convert collection of errors into a single
// reduceError - convert collection of errors into a single
// error based on total errors and read quorum.
func (xl XL) errsToStorageErr(errs []error) error {
notFoundCount := 0
func (xl XL) reduceError(errs []error) error {
fileNotFoundCount := 0
diskNotFoundCount := 0
volumeNotFoundCount := 0
diskAccessDeniedCount := 0
for _, err := range errs {
if err == errFileNotFound {
notFoundCount++
fileNotFoundCount++
} else if err == errDiskNotFound {
diskNotFoundCount++
} else if err == errVolumeAccessDenied {
diskAccessDeniedCount++
} else if err == errVolumeNotFound {
volumeNotFoundCount++
}
}
// If we have errors with 'file not found' greater than
// readQuroum, return as errFileNotFound.
if notFoundCount > len(xl.storageDisks)-xl.readQuorum {
// readQuorum, return as errFileNotFound.
// else if we have errors with 'volume not found' greater than
// readQuorum, return as errVolumeNotFound.
if fileNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return errFileNotFound
} else if volumeNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
return errVolumeNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
@ -72,7 +79,7 @@ func (xl XL) errsToStorageErr(errs []error) error {
return errDiskNotFound
} else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
// If we have errors with 'disk not found' greater than
// readQuroum, return as errFileNotFound.
// readQuorum, return as errFileNotFound.
return errFileNotFound
}
// If we have errors with disk not found equal to the
@ -90,7 +97,7 @@ func (xl XL) errsToStorageErr(errs []error) error {
// - error if any.
func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) {
partsMetadata, errs := xl.getPartsMetadata(volume, path)
if err = xl.errsToStorageErr(errs); err != nil {
if err = xl.reduceError(errs); err != nil {
return nil, xlMetaV1{}, false, err
}
highestVersion := int64(0)

@ -60,7 +60,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
// Convert errs into meaningful err to be sent upwards if possible
// based on total number of errors and read quorum.
err := xl.errsToStorageErr(errs)
err := xl.reduceError(errs)
if err != nil && err != errFileNotFound {
reader.CloseWithError(err)
return
@ -92,6 +92,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err)
return
} else if err == errVolumeNotFound {
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err)
return
}
createFileError++

@ -290,9 +290,9 @@ func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
return volsInfo, nil
}
// getAllVolumeInfo - get bucket volume info from all disks.
// getAllVolInfo - list bucket volume info from all disks.
// Returns error slice indicating the failed volume stat operations.
func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error) {
func (xl XL) getAllVolInfo(volume string) (volsInfo []VolInfo, errs []error) {
// Create errs and volInfo slices of storageDisks size.
errs = make([]error, len(xl.storageDisks))
volsInfo = make([]VolInfo, len(xl.storageDisks))
@ -320,13 +320,14 @@ func (xl XL) getAllVolumeInfo(volume string) (volsInfo []VolInfo, errs []error)
return volsInfo, errs
}
// listAllVolumeInfo - list all stat volume info from all disks.
// listAllVolInfo - list all stat volume info from all disks.
// Returns
// - stat volume info for all online disks.
// - boolean to indicate if healing is necessary.
// - error if any.
func (xl XL) listAllVolumeInfo(volume string) ([]VolInfo, bool, error) {
volsInfo, errs := xl.getAllVolumeInfo(volume)
func (xl XL) listAllVolInfo(volume string) ([]VolInfo, bool, error) {
volsInfo, errs := xl.getAllVolInfo(volume)
volsInfo = removeDuplicateVols(volsInfo)
notFoundCount := 0
for _, err := range errs {
if err == errVolumeNotFound {
@ -373,7 +374,7 @@ func (xl XL) healVolume(volume string) error {
defer nsMutex.RUnlock(volume, "")
// Lists volume info for all online disks.
volsInfo, heal, err := xl.listAllVolumeInfo(volume)
volsInfo, heal, err := xl.listAllVolInfo(volume)
if err != nil {
return err
}
@ -393,6 +394,26 @@ func (xl XL) healVolume(volume string) error {
return nil
}
// Removes any duplicate vols.
func removeDuplicateVols(volsInfo []VolInfo) []VolInfo {
// Use map to record duplicates as we find them.
result := []VolInfo{}
m := make(map[string]VolInfo)
for _, v := range volsInfo {
if _, found := m[v.Name]; !found {
m[v.Name] = v
}
}
result = make([]VolInfo, 0, len(m))
for _, v := range m {
result = append(result, v)
}
// Return the new slice.
return result
}
// StatVol - get volume stat info.
func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
if !isValidVolname(volume) {
@ -401,7 +422,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
// Acquire a read lock before reading.
nsMutex.RLock(volume, "")
volsInfo, heal, err := xl.listAllVolumeInfo(volume)
volsInfo, heal, err := xl.listAllVolInfo(volume)
nsMutex.RUnlock(volume, "")
if err != nil {
return VolInfo{}, err
@ -421,7 +442,7 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
total += volInfo.Total
}
// Filter volsInfo and update the volInfo.
volInfo = removeDuplicateVols(volsInfo)[0]
volInfo = volsInfo[0]
volInfo.Free = free
volInfo.Total = total
return volInfo, nil
@ -590,7 +611,6 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
if errCount <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
return err
}
}

@ -146,9 +146,6 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64) (io.Read
if !IsValidBucketName(bucket) {
return nil, BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(xl.storage, bucket) {
return nil, BucketNotFound{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return nil, ObjectNameInvalid{Bucket: bucket, Object: object}
@ -261,10 +258,6 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) {
if !IsValidBucketName(bucket) {
return ObjectInfo{}, BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
if !isBucketExist(xl.storage, bucket) {
return ObjectInfo{}, BucketNotFound{Bucket: bucket}
}
// Verify if object is valid.
if !IsValidObjectName(object) {
return ObjectInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object}
@ -282,7 +275,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket}
}
// Check whether the bucket exists.
// Verify bucket exists.
if !isBucketExist(xl.storage, bucket) {
return "", BucketNotFound{Bucket: bucket}
}
@ -433,9 +426,6 @@ func (xl xlObjects) DeleteObject(bucket, object string) error {
if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket}
}
if !isBucketExist(xl.storage, bucket) {
return BucketNotFound{Bucket: bucket}
}
if !IsValidObjectName(object) {
return ObjectNameInvalid{Bucket: bucket, Object: object}
}

Loading…
Cancel
Save