Deprecate listDirFactory in HealObjects, rely on ListObjectsHeal (#8419)

master
Harshavardhana 5 years ago committed by kannappanr
parent 1b74ce3924
commit 40fcd3dc48
  1. 111
      cmd/xl-sets.go

@ -1,5 +1,5 @@
/* /*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc. * MinIO Cloud Storage, (C) 2018-2019 MinIO, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,7 +22,6 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"net/http" "net/http"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -741,78 +740,6 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
return destSet.putObject(ctx, destBucket, destObject, srcInfo.PutObjReader, putOpts) return destSet.putObject(ctx, destBucket, destObject, srcInfo.PutObjReader, putOpts)
} }
// Returns function "listDir" of the type listDirFunc.
// disks - used for doing disk.ListDir(). Sets passes set of disks.
func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc {
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) {
var diskEntries = make([][]string, len(disks))
g := errgroup.WithNErrs(len(disks))
for index, disk := range disks {
if disk == nil {
continue
}
index := index
g.Go(func() error {
var err error
diskEntries[index], err = disks[index].ListDir(bucket, prefixDir, -1, xlMetaJSONFile)
return err
}, index)
}
for _, err := range g.Wait() {
if err != nil {
logger.LogIf(ctx, err)
}
}
// Find elements in entries which are not in mergedEntries
for _, entries := range diskEntries {
var newEntries []string
for _, entry := range entries {
idx := sort.SearchStrings(mergedEntries, entry)
// if entry is already present in mergedEntries don't add.
if idx < len(mergedEntries) && mergedEntries[idx] == entry {
continue
}
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
// Merge the entries and sort it.
mergedEntries = append(mergedEntries, newEntries...)
sort.Strings(mergedEntries)
}
}
return mergedEntries
}
// listDir - lists all the entries at a given prefix and given entry in the prefix.
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string) {
for _, set := range sets {
var newEntries []string
// Find elements in entries which are not in mergedEntries
for _, entry := range listDirInternal(bucket, prefixDir, prefixEntry, set.getLoadBalancedDisks()) {
idx := sort.SearchStrings(mergedEntries, entry)
// if entry is already present in mergedEntries don't add.
if idx < len(mergedEntries) && mergedEntries[idx] == entry {
continue
}
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
// Merge the entries and sort it.
mergedEntries = append(mergedEntries, newEntries...)
sort.Strings(mergedEntries)
}
}
return filterMatchingPrefix(mergedEntries, prefixEntry)
}
return listDir
}
// FileInfoCh - file info channel // FileInfoCh - file info channel
type FileInfoCh struct { type FileInfoCh struct {
Ch chan FileInfo Ch chan FileInfo
@ -1711,23 +1638,37 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
// HealObjects - Heal all objects recursively at a specified prefix, any // HealObjects - Heal all objects recursively at a specified prefix, any
// dangling objects deleted as well automatically. // dangling objects deleted as well automatically.
func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) (err error) { func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObjectFn func(string, string) error) error {
recursive := true
endWalkCh := make(chan struct{}) marker := ""
listDir := listDirSetsFactory(ctx, s.sets...)
walkResultCh := startTreeWalk(ctx, bucket, prefix, "", recursive, listDir, endWalkCh)
for { for {
walkResult, ok := <-walkResultCh if globalHTTPServer != nil {
if !ok { // Wait at max 10 minute for an inprogress request before proceeding to heal
break waitCount := 600
// Any requests in progress, delay the heal.
for (globalHTTPServer.GetRequestCount() >= int32(globalXLSetCount*globalXLSetDriveCount)) &&
waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
res, err := s.ListObjectsHeal(ctx, bucket, prefix, marker, "", 1000)
if err != nil {
continue
} }
if err := healObjectFn(bucket, walkResult.entry); err != nil {
return toObjectErr(err, bucket, walkResult.entry) for _, obj := range res.Objects {
if err = healObjectFn(bucket, obj.Name); err != nil {
return toObjectErr(err, bucket, obj.Name)
}
} }
if walkResult.end {
if !res.IsTruncated {
break break
} }
marker = res.NextMarker
} }
return nil return nil

Loading…
Cancel
Save