XL and FS use different tree walk ignored errors (#2707)

master
Anis Elleuch 8 years ago committed by Harshavardhana
parent a1ff351f21
commit 7a549096de
  1. 4
      cmd/fs-v1-multipart.go
  2. 8
      cmd/fs-v1.go
  3. 13
      cmd/tree-walk.go
  4. 12
      cmd/tree-walk_test.go
  5. 2
      cmd/xl-v1-common.go
  6. 2
      cmd/xl-v1-list-objects.go
  7. 6
      cmd/xl-v1-multipart.go
  8. 9
      cmd/xl-v1.go

@ -77,7 +77,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := fs.isMultipartUpload isLeaf := fs.isMultipartUpload
listDir := listDirFactory(isLeaf, fs.storage) listDir := listDirFactory(isLeaf, fsTreeWalkIgnoredErrs, fs.storage)
walkResultCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh) walkResultCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh)
} }
for maxUploads > 0 { for maxUploads > 0 {
@ -90,7 +90,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// For any walk error return right away. // For any walk error return right away.
if walkResult.err != nil { if walkResult.err != nil {
// File not found or Disk not found is a valid case. // File not found or Disk not found is a valid case.
if isErrIgnored(walkResult.err, walkResultIgnoredErrs) { if isErrIgnored(walkResult.err, fsTreeWalkIgnoredErrs) {
eof = true eof = true
break break
} }

@ -38,6 +38,12 @@ type fsObjects struct {
listPool *treeWalkPool listPool *treeWalkPool
} }
// list of all errors that can be ignored in tree walk operation in FS
var fsTreeWalkIgnoredErrs = []error{
errFileNotFound,
errVolumeNotFound,
}
// creates format.json, the FS format info in minioMetaBucket. // creates format.json, the FS format info in minioMetaBucket.
func initFormatFS(storageDisk StorageAPI) error { func initFormatFS(storageDisk StorageAPI) error {
return writeFSFormatData(storageDisk, newFSFormatV1()) return writeFSFormatData(storageDisk, newFSFormatV1())
@ -602,7 +608,7 @@ func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKey
// object string does not end with "/". // object string does not end with "/".
return !strings.HasSuffix(object, slashSeparator) return !strings.HasSuffix(object, slashSeparator)
} }
listDir := listDirFactory(isLeaf, fs.storage) listDir := listDirFactory(isLeaf, fsTreeWalkIgnoredErrs, fs.storage)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
} }
var fileInfos []FileInfo var fileInfos []FileInfo

@ -21,15 +21,6 @@ import (
"strings" "strings"
) )
// list of all errors that can be ignored in tree walk operation.
var walkResultIgnoredErrs = []error{
errFileNotFound,
errVolumeNotFound,
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
}
// Tree walk result carries results of tree walking. // Tree walk result carries results of tree walking.
type treeWalkResult struct { type treeWalkResult struct {
entry string entry string
@ -107,7 +98,7 @@ type isLeafFunc func(string, string) bool
// Returns function "listDir" of the type listDirFunc. // Returns function "listDir" of the type listDirFunc.
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry. // isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
// disks - used for doing disk.ListDir(). FS passes single disk argument, XL passes a list of disks. // disks - used for doing disk.ListDir(). FS passes single disk argument, XL passes a list of disks.
func listDirFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc { func listDirFactory(isLeaf isLeafFunc, treeWalkIgnoredErrs []error, disks ...StorageAPI) listDirFunc {
// listDir - lists all the entries at a given prefix and given entry in the prefix. // listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) { listDir := func(bucket, prefixDir, prefixEntry string) (entries []string, delayIsLeaf bool, err error) {
for _, disk := range disks { for _, disk := range disks {
@ -142,7 +133,7 @@ func listDirFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc {
} }
// For any reason disk was deleted or goes offline, continue // For any reason disk was deleted or goes offline, continue
// and list from other disks if possible. // and list from other disks if possible.
if isErrIgnored(err, walkResultIgnoredErrs) { if isErrIgnored(err, treeWalkIgnoredErrs) {
continue continue
} }
break break

@ -185,7 +185,7 @@ func TestTreeWalk(t *testing.T) {
isLeaf := func(volume, prefix string) bool { isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator) return !strings.HasSuffix(prefix, slashSeparator)
} }
listDir := listDirFactory(isLeaf, disk) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, disk)
// Simple test for prefix based walk. // Simple test for prefix based walk.
testTreeWalkPrefix(t, listDir, isLeaf) testTreeWalkPrefix(t, listDir, isLeaf)
// Simple test when marker is set. // Simple test when marker is set.
@ -219,7 +219,7 @@ func TestTreeWalkTimeout(t *testing.T) {
isLeaf := func(volume, prefix string) bool { isLeaf := func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator) return !strings.HasSuffix(prefix, slashSeparator)
} }
listDir := listDirFactory(isLeaf, disk) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, disk)
// TreeWalk pool with 2 seconds timeout for tree-walk go routines. // TreeWalk pool with 2 seconds timeout for tree-walk go routines.
pool := newTreeWalkPool(2 * time.Second) pool := newTreeWalkPool(2 * time.Second)
@ -291,7 +291,7 @@ func TestListDir(t *testing.T) {
// create listDir function. // create listDir function.
listDir := listDirFactory(func(volume, prefix string) bool { listDir := listDirFactory(func(volume, prefix string) bool {
return !strings.HasSuffix(prefix, slashSeparator) return !strings.HasSuffix(prefix, slashSeparator)
}, disk1, disk2) }, xlTreeWalkIgnoredErrs, disk1, disk2)
// Create file1 in fsDir1 and file2 in fsDir2. // Create file1 in fsDir1 and file2 in fsDir2.
disks := []StorageAPI{disk1, disk2} disks := []StorageAPI{disk1, disk2}
@ -363,7 +363,7 @@ func TestRecursiveTreeWalk(t *testing.T) {
} }
// Create listDir function. // Create listDir function.
listDir := listDirFactory(isLeaf, disk1) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, disk1)
// Create the namespace. // Create the namespace.
var files = []string{ var files = []string{
@ -469,7 +469,7 @@ func TestSortedness(t *testing.T) {
return !strings.HasSuffix(prefix, slashSeparator) return !strings.HasSuffix(prefix, slashSeparator)
} }
// Create listDir function. // Create listDir function.
listDir := listDirFactory(isLeaf, disk1) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, disk1)
// Create the namespace. // Create the namespace.
var files = []string{ var files = []string{
@ -543,7 +543,7 @@ func TestTreeWalkIsEnd(t *testing.T) {
return !strings.HasSuffix(prefix, slashSeparator) return !strings.HasSuffix(prefix, slashSeparator)
} }
// Create listDir function. // Create listDir function.
listDir := listDirFactory(isLeaf, disk1) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, disk1)
// Create the namespace. // Create the namespace.
var files = []string{ var files = []string{

@ -62,7 +62,7 @@ func (xl xlObjects) isObject(bucket, prefix string) (ok bool) {
return true return true
} }
// Ignore for file not found, disk not found or faulty disk. // Ignore for file not found, disk not found or faulty disk.
if isErrIgnored(err, walkResultIgnoredErrs) { if isErrIgnored(err, xlTreeWalkIgnoredErrs) {
continue continue
} }
errorIf(err, "Unable to stat a file %s/%s/%s", bucket, prefix, xlMetaJSONFile) errorIf(err, "Unable to stat a file %s/%s/%s", bucket, prefix, xlMetaJSONFile)

@ -31,7 +31,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey
if walkResultCh == nil { if walkResultCh == nil {
endWalkCh = make(chan struct{}) endWalkCh = make(chan struct{})
isLeaf := xl.isObject isLeaf := xl.isObject
listDir := listDirFactory(isLeaf, xl.getLoadBalancedDisks()...) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh) walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, isLeaf, endWalkCh)
} }

@ -95,7 +95,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if walkerCh == nil { if walkerCh == nil {
walkerDoneCh = make(chan struct{}) walkerDoneCh = make(chan struct{})
isLeaf := xl.isMultipartUpload isLeaf := xl.isMultipartUpload
listDir := listDirFactory(isLeaf, xl.getLoadBalancedDisks()...) listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs, xl.getLoadBalancedDisks()...)
walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh) walkerCh = startTreeWalk(minioMetaBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, walkerDoneCh)
} }
// Collect uploads until we have reached maxUploads count to 0. // Collect uploads until we have reached maxUploads count to 0.
@ -109,7 +109,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
// For any walk error return right away. // For any walk error return right away.
if walkResult.err != nil { if walkResult.err != nil {
// File not found or Disk not found is a valid case. // File not found or Disk not found is a valid case.
if isErrIgnored(walkResult.err, walkResultIgnoredErrs) { if isErrIgnored(walkResult.err, xlTreeWalkIgnoredErrs) {
continue continue
} }
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err
@ -154,7 +154,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
} }
nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID) nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry), opsID)
if err != nil { if err != nil {
if isErrIgnored(err, walkResultIgnoredErrs) { if isErrIgnored(err, xlTreeWalkIgnoredErrs) {
continue continue
} }
return ListMultipartsInfo{}, err return ListMultipartsInfo{}, err

@ -67,6 +67,15 @@ type xlObjects struct {
objCacheEnabled bool objCacheEnabled bool
} }
// list of all errors that can be ignored in tree walk operation in XL
var xlTreeWalkIgnoredErrs = []error{
errFileNotFound,
errVolumeNotFound,
errDiskNotFound,
errDiskAccessDenied,
errFaultyDisk,
}
func repairDiskMetadata(storageDisks []StorageAPI) error { func repairDiskMetadata(storageDisks []StorageAPI) error {
// Attempt to load all `format.json`. // Attempt to load all `format.json`.
formatConfigs, sErrs := loadAllFormats(storageDisks) formatConfigs, sErrs := loadAllFormats(storageDisks)

Loading…
Cancel
Save