storage/xl: Return errVolumeAccessDenied if disks cannot be accessed. (#1621)

Fixes #1614
master
Harshavardhana 9 years ago
parent d6e0f3ab33
commit b62774d32f
  1. 2
      object-common-multipart.go
  2. 59
      xl-erasure-v1-common.go
  3. 26
      xl-erasure-v1-createfile.go
  4. 14
      xl-objects.go

@ -312,7 +312,7 @@ func listMetaBucketMultipartFiles(layer ObjectLayer, prefixPath string, markerPa
"marker": markerPath, "marker": markerPath,
"recursive": recursive, "recursive": recursive,
}).Debugf("Walk resulted in an error %s", walkResult.err) }).Debugf("Walk resulted in an error %s", walkResult.err)
// 'File not found' or 'Disk not found' is a valid case. // File not found or Disk not found is a valid case.
if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound { if walkResult.err == errFileNotFound || walkResult.err == errDiskNotFound {
return nil, true, nil return nil, true, nil
} }

@ -48,34 +48,53 @@ func listFileVersions(partsMetadata []xlMetaV1, errs []error) (versions []int64)
return versions return versions
} }
// Returns slice of online disks needed. // errsToStorageErr - convert collection of errors into a single
// - slice returing readable disks. // error based on total errors and read quorum.
// - xlMetaV1 func (xl XL) errsToStorageErr(errs []error) error {
// - bool value indicating if healing is needed.
// - error if any.
func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) {
partsMetadata, errs := xl.getPartsMetadata(volume, path)
notFoundCount := 0 notFoundCount := 0
diskNotFoundCount := 0 diskNotFoundCount := 0
diskAccessDeniedCount := 0
for _, err := range errs { for _, err := range errs {
if err == errFileNotFound || err == errDiskNotFound { if err == errFileNotFound {
notFoundCount++ notFoundCount++
// If we have errors with 'file not found' or 'disk not found' greater than } else if err == errDiskNotFound {
// writeQuroum, return as errFileNotFound.
if notFoundCount > len(xl.storageDisks)-xl.readQuorum {
return nil, xlMetaV1{}, false, errFileNotFound
}
}
if err == errDiskNotFound {
diskNotFoundCount++ diskNotFoundCount++
// If we have errors with disk not found equal to the } else if err == errVolumeAccessDenied {
// number of disks, return as errDiskNotFound. diskAccessDeniedCount++
if diskNotFoundCount == len(xl.storageDisks) {
return nil, xlMetaV1{}, false, errDiskNotFound
}
} }
} }
// If we have errors with 'file not found' greater than
// readQuroum, return as errFileNotFound.
if notFoundCount > len(xl.storageDisks)-xl.readQuorum {
return errFileNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskNotFoundCount == len(xl.storageDisks) {
return errDiskNotFound
} else if diskNotFoundCount > len(xl.storageDisks)-xl.readQuorum {
// If we have errors with 'disk not found' greater than
// readQuroum, return as errFileNotFound.
return errFileNotFound
}
// If we have errors with disk not found equal to the
// number of disks, return as errDiskNotFound.
if diskAccessDeniedCount == len(xl.storageDisks) {
return errVolumeAccessDenied
}
return nil
}
// Returns slice of online disks needed.
// - slice returing readable disks.
// - xlMetaV1
// - bool value indicating if healing is needed.
// - error if any.
func (xl XL) listOnlineDisks(volume, path string) (onlineDisks []StorageAPI, mdata xlMetaV1, heal bool, err error) {
partsMetadata, errs := xl.getPartsMetadata(volume, path)
if err = xl.errsToStorageErr(errs); err != nil {
return nil, xlMetaV1{}, false, err
}
highestVersion := int64(0) highestVersion := int64(0)
onlineDisks = make([]StorageAPI, len(xl.storageDisks)) onlineDisks = make([]StorageAPI, len(xl.storageDisks))
// List all the file versions from partsMetadata list. // List all the file versions from partsMetadata list.

@ -62,24 +62,18 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *w
partsMetadata, errs := xl.getPartsMetadata(volume, path) partsMetadata, errs := xl.getPartsMetadata(volume, path)
nsMutex.RUnlock(volume, path) nsMutex.RUnlock(volume, path)
// Count errors other than fileNotFound, bigger than the allowed // Convert errs into meaningful err to be sent upwards if possible
// readQuorum, if yes throw an error. // based on total number of errors and read quorum.
metadataReadErrCount := 0 err := xl.errsToStorageErr(errs)
for _, err := range errs { if err != nil && err != errFileNotFound {
if err != nil && err != errFileNotFound { log.WithFields(logrus.Fields{
metadataReadErrCount++ "volume": volume,
if metadataReadErrCount > xl.readQuorum { "path": path,
log.WithFields(logrus.Fields{ }).Errorf("%s", err)
"volume": volume, reader.CloseWithError(err)
"path": path, return
}).Errorf("%s", err)
reader.CloseWithError(err)
return
}
}
} }
var err error
// List all the file versions on existing files. // List all the file versions on existing files.
versions := listFileVersions(partsMetadata, errs) versions := listFileVersions(partsMetadata, errs)
// Get highest file version. // Get highest file version.

@ -93,12 +93,14 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
} else { } else {
log.Errorf("Unable to check backend format %s", err) log.Errorf("Unable to check backend format %s", err)
if err == errReadQuorum { if err == errReadQuorum {
errMsg := fmt.Sprintf("Not all disks %s on command line are available to meet the read quroum.", exportPaths) errMsg := fmt.Sprintf("Not all disks %s are available, did not meet read quroum.", exportPaths)
return nil, errors.New(errMsg) err = errors.New(errMsg)
} } else if err == errDiskNotFound {
if err == errDiskNotFound { errMsg := fmt.Sprintf("Disks %s not found.", exportPaths)
errMsg := fmt.Sprintf("All disks %s on command line are not available.", exportPaths) err = errors.New(errMsg)
return nil, errors.New(errMsg) } else if err == errVolumeAccessDenied {
errMsg := fmt.Sprintf("Disks %s access permission denied.", exportPaths)
err = errors.New(errMsg)
} }
return nil, err return nil, err
} }

Loading…
Cancel
Save