diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 294eec773..ac84423f3 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -433,12 +433,12 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob } // Load bucket metadata sys in background - go logger.LogIf(ctx, sys.load(ctx, buckets, objAPI)) + go sys.load(ctx, buckets, objAPI) return nil } // concurrently load bucket metadata to speed up loading bucket metadata. -func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { +func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { g := errgroup.WithNErrs(len(buckets)) for index := range buckets { index := index @@ -455,22 +455,20 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck } for _, err := range g.Wait() { if err != nil { - return err + logger.LogIf(ctx, err) } } - return nil } // Loads bucket metadata for all buckets into BucketMetadataSys. -func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { +func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { count := 100 // load 100 bucket metadata at a time. for { if len(buckets) < count { - return sys.concurrentLoad(ctx, buckets, objAPI) - } - if err := sys.concurrentLoad(ctx, buckets[:count], objAPI); err != nil { - return err + sys.concurrentLoad(ctx, buckets, objAPI) + return } + sys.concurrentLoad(ctx, buckets[:count], objAPI) buckets = buckets[count:] } } diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 74ed4e253..b31e6d6a8 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -421,6 +421,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } return nil } + // Dynamic time delay. t := UTCNow() @@ -480,22 +481,29 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // If that doesn't bring it back we remove the folder and assume it was deleted. // This means that the next run will not look for it. for k := range existing { - // Dynamic time delay. - t := UTCNow() - bucket, prefix := path2BucketObject(k) if f.dataUsageCrawlDebug { logger.Info(color.Green("folder-scanner:")+" checking disappeared folder: %v/%v", bucket, prefix) } + // Dynamic time delay. + t := UTCNow() + err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling}, func(bucket, object, versionID string) error { + // Wait for each heal as per crawler frequency. + sleepDuration(time.Since(t), f.dataUsageCrawlMult) + + defer func() { + t = UTCNow() + }() return bgSeq.queueHealTask(healSource{ bucket: bucket, object: object, versionID: versionID, }, madmin.HealItemObject) }) + sleepDuration(time.Since(t), f.dataUsageCrawlMult) if f.dataUsageCrawlDebug && err != nil { diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index 5b90ac44d..4481fd389 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -228,9 +228,6 @@ func (z *erasureServerSets) getZoneIdx(ctx context.Context, bucket, object strin func (z *erasureServerSets) Shutdown(ctx context.Context) error { defer z.shutdown() - if z.SingleZone() { - return z.serverSets[0].Shutdown(ctx) - } g := errgroup.WithNErrs(len(z.serverSets)) @@ -251,11 +248,8 @@ func (z *erasureServerSets) Shutdown(ctx context.Context) error { } func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) { - if z.SingleZone() { - return z.serverSets[0].StorageInfo(ctx, local) - } - var storageInfo StorageInfo + storageInfo.Backend.Type = BackendErasure storageInfos := make([]StorageInfo, len(z.serverSets)) storageInfosErrs := make([][]error, len(z.serverSets)) @@ -277,11 +271,16 @@ func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (Storag storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks) } - storageInfo.Backend.Type = storageInfos[0].Backend.Type - storageInfo.Backend.StandardSCData = storageInfos[0].Backend.StandardSCData - storageInfo.Backend.StandardSCParity = storageInfos[0].Backend.StandardSCParity - storageInfo.Backend.RRSCData = storageInfos[0].Backend.RRSCData - storageInfo.Backend.RRSCParity = storageInfos[0].Backend.RRSCParity + scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) + if scParity == 0 { + scParity = z.SetDriveCount() / 2 + } + + storageInfo.Backend.StandardSCData = z.SetDriveCount() - scParity + storageInfo.Backend.StandardSCParity = scParity + rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS) + storageInfo.Backend.RRSCData = z.SetDriveCount() - rrSCParity + storageInfo.Backend.RRSCParity = rrSCParity var errs []error for i := range z.serverSets { @@ -1896,8 +1895,6 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri } for _, version := range entry.Versions { - // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex]), time.Second) if err := healObject(bucket, version.Name, version.VersionID); err != nil { return toObjectErr(err, bucket, version.Name) } @@ -1926,9 +1923,6 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers defer lk.RUnlock() } - if z.SingleZone() { - return z.serverSets[0].HealObject(ctx, bucket, object, versionID, opts) - } for _, zone := range z.serverSets { result, err := zone.HealObject(ctx, bucket, object, versionID, opts) if err != nil { @@ -1939,6 +1933,13 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers } return result, nil } + if versionID != "" { + return madmin.HealResultItem{}, VersionNotFound{ + Bucket: bucket, + Object: object, + VersionID: versionID, + } + } return madmin.HealResultItem{}, ObjectNotFound{ Bucket: bucket, Object: object, diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 112b28fca..b94f45145 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -32,7 +32,6 @@ import ( "github.com/google/uuid" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/cmd/config" - "github.com/minio/minio/cmd/config/storageclass" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/dsync" @@ -227,7 +226,7 @@ func (s *erasureSets) connectDisks() { if err != nil { if endpoint.IsLocal { globalBackgroundHealState.pushHealLocalDisks(endpoint) - logger.Info(fmt.Sprintf("Found inconsistent drive %s with format.json, attempting to heal...", endpoint)) + logger.Info(fmt.Sprintf("Found inconsistent drive %s with format.json, attempting to heal... (%s)", endpoint, err)) } else { printEndpointError(endpoint, err, false) } @@ -483,7 +482,6 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, storageInfos := make([]StorageInfo, len(s.sets)) storageInfoErrs := make([][]error, len(s.sets)) - storageInfo.Backend.Type = BackendErasure g := errgroup.WithNErrs(len(s.sets)) for index := range s.sets { @@ -503,17 +501,6 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks) } - scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) - if scParity == 0 { - scParity = s.setDriveCount / 2 - } - storageInfo.Backend.StandardSCData = s.setDriveCount - scParity - storageInfo.Backend.StandardSCParity = scParity - - rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS) - storageInfo.Backend.RRSCData = s.setDriveCount - rrSCParity - storageInfo.Backend.RRSCParity = rrSCParity - if local { // if local is true, we are not interested in the drive UUID info. // this is called primarily by prometheus @@ -878,70 +865,6 @@ func (f *FileInfoCh) Push(fi FileInfo) { f.Valid = true } -// Calculate lexically least entry across multiple FileInfo channels, -// returns the lexically common entry and the total number of times -// we found this entry. Additionally also returns a boolean -// to indicate if the caller needs to call this function -// again to list the next entry. It is callers responsibility -// if the caller wishes to list N entries to call lexicallySortedEntry -// N times until this boolean is 'false'. -func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { - for j := range entryChs { - entries[j], entriesValid[j] = entryChs[j].Pop() - } - - var isTruncated = false - for _, valid := range entriesValid { - if !valid { - continue - } - isTruncated = true - break - } - - var lentry FileInfo - var found bool - for i, valid := range entriesValid { - if !valid { - continue - } - if !found { - lentry = entries[i] - found = true - continue - } - if entries[i].Name < lentry.Name { - lentry = entries[i] - } - } - - // We haven't been able to find any lexically least entry, - // this would mean that we don't have valid entry. - if !found { - return lentry, 0, isTruncated - } - - lexicallySortedEntryCount := 0 - for i, valid := range entriesValid { - if !valid { - continue - } - - // Entries are duplicated across disks, - // we should simply skip such entries. - if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) { - lexicallySortedEntryCount++ - continue - } - - // Push all entries which are lexically higher - // and will be returned later in Pop() - entryChs[i].Push(entries[i]) - } - - return lentry, lexicallySortedEntryCount, isTruncated -} - // Calculate lexically least entry across multiple FileInfo channels, // returns the lexically common entry and the total number of times // we found this entry. Additionally also returns a boolean @@ -1521,105 +1444,6 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) return listBuckets, nil } -// Walk a bucket, optionally prefix recursively, until we have returned -// all the content to objectInfo channel, it is callers responsibility -// to allocate a receive channel for ObjectInfo, upon any unhandled -// error walker returns error. Optionally if context.Done() is received -// then Walk() stops the walker. -func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { - if err := checkListObjsArgs(ctx, bucket, prefix, "", s); err != nil { - // Upon error close the channel. - close(results) - return err - } - - if opts.WalkVersions { - entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done()) - - entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfoVersions, len(entryChs)) - - go func() { - defer close(results) - - for { - entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) - if !ok { - return - } - - if quorumCount >= s.setDriveCount/2 { - // Read quorum exists proceed - for _, version := range entry.Versions { - results <- version.ToObjectInfo(bucket, version.Name) - } - } - // skip entries which do not have quorum - } - }() - - return nil - } - - entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()) - - entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfo, len(entryChs)) - - go func() { - defer close(results) - - for { - entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid) - if !ok { - return - } - - if quorumCount >= s.setDriveCount/2 { - // Read quorum exists proceed - results <- entry.ToObjectInfo(bucket, entry.Name) - } - // skip entries which do not have quorum - } - }() - - return nil -} - -// HealObjects - Heal all objects recursively at a specified prefix, any -// dangling objects deleted as well automatically. -func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error { - endWalkCh := make(chan struct{}) - defer close(endWalkCh) - - entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, endWalkCh) - - entriesValid := make([]bool, len(entryChs)) - entries := make([]FileInfoVersions, len(entryChs)) - for { - entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) - if !ok { - break - } - - if quorumCount == s.setDriveCount && opts.ScanMode == madmin.HealNormalScan { - // Skip good entries. - continue - } - - for _, version := range entry.Versions { - // Wait and proceed if there are active requests - waitForLowHTTPReq(int32(s.setDriveCount), time.Second) - - if err := healObject(bucket, version.Name, version.VersionID); err != nil { - return toObjectErr(err, bucket, version.Name) - } - } - } - - return nil -} - // PutObjectTags - replace or add tags to an existing object func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error { return s.getHashedSet(object).PutObjectTags(ctx, bucket, object, tags, opts) diff --git a/cmd/notification.go b/cmd/notification.go index aa7014fd9..8dc67e8f9 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -590,12 +590,13 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName } // Loads notification policies for all buckets into NotificationSys. -func (sys *NotificationSys) load(buckets []BucketInfo) error { +func (sys *NotificationSys) load(buckets []BucketInfo) { for _, bucket := range buckets { ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name}) config, err := globalBucketMetadataSys.GetNotificationConfig(bucket.Name) if err != nil { - return err + logger.LogIf(ctx, err) + continue } config.SetRegion(globalServerRegion) if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil { @@ -606,7 +607,6 @@ func (sys *NotificationSys) load(buckets []BucketInfo) error { } sys.AddRulesMap(bucket.Name, config.ToRulesMap()) } - return nil } // Init - initializes notification system from notification.xml and listenxl.meta of all buckets. @@ -632,7 +632,7 @@ func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objA } }() - go logger.LogIf(ctx, sys.load(buckets)) + go sys.load(buckets) return nil }