From c872c30ea3c925bdae4f96b71b4fc4a1ca631f9f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 10 May 2018 16:53:42 -0700 Subject: [PATCH] fix: introduce isLeafDir in healing to fix the crash (#5920) This PR also supports healing directories. Fixes #5917 --- cmd/tree-walk.go | 8 +++++- cmd/xl-sets.go | 21 +++++---------- cmd/xl-v1-healing.go | 62 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 16 deletions(-) diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index 5e3c003de..75de800ea 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -153,6 +153,12 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker return err } } + + // When isleaf check is delayed, make sure that it is set correctly here. + if delayIsLeaf && isLeaf == nil { + return errInvalidArgument + } + // For an empty list return right here. if len(entries) == 0 { return nil @@ -169,6 +175,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker if len(entries) == 0 { return nil } + for i, entry := range entries { var leaf, leafDir bool @@ -187,7 +194,6 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker } isDir := !leafDir && !leaf - if i == 0 && markerDir == entry { if !recursive { // Skip as the marker would already be listed in the previous listing. diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 44e9f4af6..870acc552 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -1360,13 +1360,17 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de return s.getHashedSet(entry).isObject(bucket, entry) } + isLeafDir := func(bucket, entry string) bool { + return s.getHashedSet(entry).isObjectDir(bucket, entry) + } + var setDisks = make([][]StorageAPI, len(s.sets)) for _, set := range s.sets { setDisks = append(setDisks, set.getLoadBalancedDisks()) } listDir := listDirSetsHealFactory(isLeaf, setDisks...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, nil, endWalkCh) + walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, isLeafDir, endWalkCh) } var objInfos []ObjectInfo @@ -1403,20 +1407,7 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de result := ListObjectsInfo{IsTruncated: !eof} for _, objInfo := range objInfos { result.NextMarker = objInfo.Name - if objInfo.IsDir { - result.Prefixes = append(result.Prefixes, objInfo.Name) - continue - } - - // Add each object seen to the result - objects are - // checked for healing later. - result.Objects = append(result.Objects, ObjectInfo{ - Bucket: bucket, - Name: objInfo.Name, - ModTime: objInfo.ModTime, - Size: objInfo.Size, - IsDir: false, - }) + result.Objects = append(result.Objects, objInfo) } return result, nil } diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 286979412..eb589f790 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -539,12 +539,74 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o return result, nil } +// healObjectDir - heals object directory specifically, this special call +// is needed since we do not have a special backend format for directories. +func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) { + storageDisks := xl.getDisks() + + // Initialize heal result object + hr = madmin.HealResultItem{ + Type: madmin.HealItemObject, + Bucket: bucket, + Object: object, + DiskCount: len(storageDisks), + ParityBlocks: len(storageDisks) / 2, + DataBlocks: len(storageDisks) / 2, + ObjectSize: 0, + } + + // Prepare object creation in all disks + for _, disk := range storageDisks { + if disk == nil { + hr.Before.Drives = append(hr.Before.Drives, madmin.HealDriveInfo{ + UUID: "", + State: madmin.DriveStateOffline, + }) + hr.After.Drives = append(hr.After.Drives, madmin.HealDriveInfo{ + UUID: "", + State: madmin.DriveStateMissing, + }) + continue + } + + drive := disk.String() + hr.Before.Drives = append(hr.Before.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: drive, + State: madmin.DriveStateMissing, + }) + hr.After.Drives = append(hr.After.Drives, madmin.HealDriveInfo{ + UUID: "", + Endpoint: drive, + State: madmin.DriveStateMissing, + }) + + if !dryRun { + if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists { + return hr, toObjectErr(err, bucket, object) + } + + for i, v := range hr.Before.Drives { + if v.Endpoint == drive { + hr.After.Drives[i].State = madmin.DriveStateOk + } + } + } + } + return hr, nil +} + // HealObject - heal the given object. // // FIXME: If an object object was deleted and one disk was down, // and later the disk comes back up again, heal on the object // should delete it. func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) { + // Healing directories handle it separately. + if hasSuffix(object, slashSeparator) { + return xl.healObjectDir(ctx, bucket, object, dryRun) + } + // FIXME: Metadata is read again in the healObject() call below. // Read metadata files from all the disks partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)