fix: slow down auto healing more aggressively (#10730)

Bonus fixes

- logging improvements to ensure that we don't use
  `go logger.LogIf` to avoid runtime.Caller missing
  the function name. log where necessary.
- remove unused code at erasure sets
master
Harshavardhana 4 years ago committed by GitHub
parent 0e0c53bba4
commit 734f258878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      cmd/bucket-metadata-sys.go
  2. 14
      cmd/data-crawler.go
  3. 35
      cmd/erasure-server-sets.go
  4. 178
      cmd/erasure-sets.go
  5. 8
      cmd/notification.go

@ -433,12 +433,12 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob
} }
// Load bucket metadata sys in background // Load bucket metadata sys in background
go logger.LogIf(ctx, sys.load(ctx, buckets, objAPI)) go sys.load(ctx, buckets, objAPI)
return nil return nil
} }
// concurrently load bucket metadata to speed up loading bucket metadata. // concurrently load bucket metadata to speed up loading bucket metadata.
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
g := errgroup.WithNErrs(len(buckets)) g := errgroup.WithNErrs(len(buckets))
for index := range buckets { for index := range buckets {
index := index index := index
@ -455,22 +455,20 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
} }
for _, err := range g.Wait() { for _, err := range g.Wait() {
if err != nil { if err != nil {
return err logger.LogIf(ctx, err)
} }
} }
return nil
} }
// Loads bucket metadata for all buckets into BucketMetadataSys. // Loads bucket metadata for all buckets into BucketMetadataSys.
func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
count := 100 // load 100 bucket metadata at a time. count := 100 // load 100 bucket metadata at a time.
for { for {
if len(buckets) < count { if len(buckets) < count {
return sys.concurrentLoad(ctx, buckets, objAPI) sys.concurrentLoad(ctx, buckets, objAPI)
} return
if err := sys.concurrentLoad(ctx, buckets[:count], objAPI); err != nil {
return err
} }
sys.concurrentLoad(ctx, buckets[:count], objAPI)
buckets = buckets[count:] buckets = buckets[count:]
} }
} }

@ -421,6 +421,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
} }
return nil return nil
} }
// Dynamic time delay. // Dynamic time delay.
t := UTCNow() t := UTCNow()
@ -480,22 +481,29 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
// If that doesn't bring it back we remove the folder and assume it was deleted. // If that doesn't bring it back we remove the folder and assume it was deleted.
// This means that the next run will not look for it. // This means that the next run will not look for it.
for k := range existing { for k := range existing {
// Dynamic time delay.
t := UTCNow()
bucket, prefix := path2BucketObject(k) bucket, prefix := path2BucketObject(k)
if f.dataUsageCrawlDebug { if f.dataUsageCrawlDebug {
logger.Info(color.Green("folder-scanner:")+" checking disappeared folder: %v/%v", bucket, prefix) logger.Info(color.Green("folder-scanner:")+" checking disappeared folder: %v/%v", bucket, prefix)
} }
// Dynamic time delay.
t := UTCNow()
err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling}, err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling},
func(bucket, object, versionID string) error { func(bucket, object, versionID string) error {
// Wait for each heal as per crawler frequency.
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
defer func() {
t = UTCNow()
}()
return bgSeq.queueHealTask(healSource{ return bgSeq.queueHealTask(healSource{
bucket: bucket, bucket: bucket,
object: object, object: object,
versionID: versionID, versionID: versionID,
}, madmin.HealItemObject) }, madmin.HealItemObject)
}) })
sleepDuration(time.Since(t), f.dataUsageCrawlMult) sleepDuration(time.Since(t), f.dataUsageCrawlMult)
if f.dataUsageCrawlDebug && err != nil { if f.dataUsageCrawlDebug && err != nil {

@ -228,9 +228,6 @@ func (z *erasureServerSets) getZoneIdx(ctx context.Context, bucket, object strin
func (z *erasureServerSets) Shutdown(ctx context.Context) error { func (z *erasureServerSets) Shutdown(ctx context.Context) error {
defer z.shutdown() defer z.shutdown()
if z.SingleZone() {
return z.serverSets[0].Shutdown(ctx)
}
g := errgroup.WithNErrs(len(z.serverSets)) g := errgroup.WithNErrs(len(z.serverSets))
@ -251,11 +248,8 @@ func (z *erasureServerSets) Shutdown(ctx context.Context) error {
} }
func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) { func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
if z.SingleZone() {
return z.serverSets[0].StorageInfo(ctx, local)
}
var storageInfo StorageInfo var storageInfo StorageInfo
storageInfo.Backend.Type = BackendErasure
storageInfos := make([]StorageInfo, len(z.serverSets)) storageInfos := make([]StorageInfo, len(z.serverSets))
storageInfosErrs := make([][]error, len(z.serverSets)) storageInfosErrs := make([][]error, len(z.serverSets))
@ -277,11 +271,16 @@ func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (Storag
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks) storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks)
} }
storageInfo.Backend.Type = storageInfos[0].Backend.Type scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
storageInfo.Backend.StandardSCData = storageInfos[0].Backend.StandardSCData if scParity == 0 {
storageInfo.Backend.StandardSCParity = storageInfos[0].Backend.StandardSCParity scParity = z.SetDriveCount() / 2
storageInfo.Backend.RRSCData = storageInfos[0].Backend.RRSCData }
storageInfo.Backend.RRSCParity = storageInfos[0].Backend.RRSCParity
storageInfo.Backend.StandardSCData = z.SetDriveCount() - scParity
storageInfo.Backend.StandardSCParity = scParity
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
storageInfo.Backend.RRSCData = z.SetDriveCount() - rrSCParity
storageInfo.Backend.RRSCParity = rrSCParity
var errs []error var errs []error
for i := range z.serverSets { for i := range z.serverSets {
@ -1896,8 +1895,6 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri
} }
for _, version := range entry.Versions { for _, version := range entry.Versions {
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex]), time.Second)
if err := healObject(bucket, version.Name, version.VersionID); err != nil { if err := healObject(bucket, version.Name, version.VersionID); err != nil {
return toObjectErr(err, bucket, version.Name) return toObjectErr(err, bucket, version.Name)
} }
@ -1926,9 +1923,6 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers
defer lk.RUnlock() defer lk.RUnlock()
} }
if z.SingleZone() {
return z.serverSets[0].HealObject(ctx, bucket, object, versionID, opts)
}
for _, zone := range z.serverSets { for _, zone := range z.serverSets {
result, err := zone.HealObject(ctx, bucket, object, versionID, opts) result, err := zone.HealObject(ctx, bucket, object, versionID, opts)
if err != nil { if err != nil {
@ -1939,6 +1933,13 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers
} }
return result, nil return result, nil
} }
if versionID != "" {
return madmin.HealResultItem{}, VersionNotFound{
Bucket: bucket,
Object: object,
VersionID: versionID,
}
}
return madmin.HealResultItem{}, ObjectNotFound{ return madmin.HealResultItem{}, ObjectNotFound{
Bucket: bucket, Bucket: bucket,
Object: object, Object: object,

@ -32,7 +32,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/config/storageclass"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool" "github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/dsync" "github.com/minio/minio/pkg/dsync"
@ -227,7 +226,7 @@ func (s *erasureSets) connectDisks() {
if err != nil { if err != nil {
if endpoint.IsLocal { if endpoint.IsLocal {
globalBackgroundHealState.pushHealLocalDisks(endpoint) globalBackgroundHealState.pushHealLocalDisks(endpoint)
logger.Info(fmt.Sprintf("Found inconsistent drive %s with format.json, attempting to heal...", endpoint)) logger.Info(fmt.Sprintf("Found inconsistent drive %s with format.json, attempting to heal... (%s)", endpoint, err))
} else { } else {
printEndpointError(endpoint, err, false) printEndpointError(endpoint, err, false)
} }
@ -483,7 +482,6 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo,
storageInfos := make([]StorageInfo, len(s.sets)) storageInfos := make([]StorageInfo, len(s.sets))
storageInfoErrs := make([][]error, len(s.sets)) storageInfoErrs := make([][]error, len(s.sets))
storageInfo.Backend.Type = BackendErasure
g := errgroup.WithNErrs(len(s.sets)) g := errgroup.WithNErrs(len(s.sets))
for index := range s.sets { for index := range s.sets {
@ -503,17 +501,6 @@ func (s *erasureSets) StorageInfo(ctx context.Context, local bool) (StorageInfo,
storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks) storageInfo.Backend.OfflineDisks = storageInfo.Backend.OfflineDisks.Merge(lstorageInfo.Backend.OfflineDisks)
} }
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if scParity == 0 {
scParity = s.setDriveCount / 2
}
storageInfo.Backend.StandardSCData = s.setDriveCount - scParity
storageInfo.Backend.StandardSCParity = scParity
rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
storageInfo.Backend.RRSCData = s.setDriveCount - rrSCParity
storageInfo.Backend.RRSCParity = rrSCParity
if local { if local {
// if local is true, we are not interested in the drive UUID info. // if local is true, we are not interested in the drive UUID info.
// this is called primarily by prometheus // this is called primarily by prometheus
@ -878,70 +865,6 @@ func (f *FileInfoCh) Push(fi FileInfo) {
f.Valid = true f.Valid = true
} }
// Calculate lexically least entry across multiple FileInfo channels,
// returns the lexically common entry and the total number of times
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call lexicallySortedEntry
// N times until this boolean is 'false'.
func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
for j := range entryChs {
entries[j], entriesValid[j] = entryChs[j].Pop()
}
var isTruncated = false
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
var lentry FileInfo
var found bool
for i, valid := range entriesValid {
if !valid {
continue
}
if !found {
lentry = entries[i]
found = true
continue
}
if entries[i].Name < lentry.Name {
lentry = entries[i]
}
}
// We haven't been able to find any lexically least entry,
// this would mean that we don't have valid entry.
if !found {
return lentry, 0, isTruncated
}
lexicallySortedEntryCount := 0
for i, valid := range entriesValid {
if !valid {
continue
}
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) {
lexicallySortedEntryCount++
continue
}
// Push all entries which are lexically higher
// and will be returned later in Pop()
entryChs[i].Push(entries[i])
}
return lentry, lexicallySortedEntryCount, isTruncated
}
// Calculate lexically least entry across multiple FileInfo channels, // Calculate lexically least entry across multiple FileInfo channels,
// returns the lexically common entry and the total number of times // returns the lexically common entry and the total number of times
// we found this entry. Additionally also returns a boolean // we found this entry. Additionally also returns a boolean
@ -1521,105 +1444,6 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error)
return listBuckets, nil return listBuckets, nil
} }
// Walk a bucket, optionally prefix recursively, until we have returned
// all the content to objectInfo channel, it is callers responsibility
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", s); err != nil {
// Upon error close the channel.
close(results)
return err
}
if opts.WalkVersions {
entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done())
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfoVersions, len(entryChs))
go func() {
defer close(results)
for {
entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
if !ok {
return
}
if quorumCount >= s.setDriveCount/2 {
// Read quorum exists proceed
for _, version := range entry.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
}
}
// skip entries which do not have quorum
}
}()
return nil
}
entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done())
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfo, len(entryChs))
go func() {
defer close(results)
for {
entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid)
if !ok {
return
}
if quorumCount >= s.setDriveCount/2 {
// Read quorum exists proceed
results <- entry.ToObjectInfo(bucket, entry.Name)
}
// skip entries which do not have quorum
}
}()
return nil
}
// HealObjects - Heal all objects recursively at a specified prefix, any
// dangling objects deleted as well automatically.
func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
endWalkCh := make(chan struct{})
defer close(endWalkCh)
entryChs := s.startMergeWalksVersions(ctx, bucket, prefix, "", true, endWalkCh)
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfoVersions, len(entryChs))
for {
entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid)
if !ok {
break
}
if quorumCount == s.setDriveCount && opts.ScanMode == madmin.HealNormalScan {
// Skip good entries.
continue
}
for _, version := range entry.Versions {
// Wait and proceed if there are active requests
waitForLowHTTPReq(int32(s.setDriveCount), time.Second)
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
return toObjectErr(err, bucket, version.Name)
}
}
}
return nil
}
// PutObjectTags - replace or add tags to an existing object // PutObjectTags - replace or add tags to an existing object
func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error { func (s *erasureSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
return s.getHashedSet(object).PutObjectTags(ctx, bucket, object, tags, opts) return s.getHashedSet(object).PutObjectTags(ctx, bucket, object, tags, opts)

@ -590,12 +590,13 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
} }
// Loads notification policies for all buckets into NotificationSys. // Loads notification policies for all buckets into NotificationSys.
func (sys *NotificationSys) load(buckets []BucketInfo) error { func (sys *NotificationSys) load(buckets []BucketInfo) {
for _, bucket := range buckets { for _, bucket := range buckets {
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name}) ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
config, err := globalBucketMetadataSys.GetNotificationConfig(bucket.Name) config, err := globalBucketMetadataSys.GetNotificationConfig(bucket.Name)
if err != nil { if err != nil {
return err logger.LogIf(ctx, err)
continue
} }
config.SetRegion(globalServerRegion) config.SetRegion(globalServerRegion)
if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil { if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil {
@ -606,7 +607,6 @@ func (sys *NotificationSys) load(buckets []BucketInfo) error {
} }
sys.AddRulesMap(bucket.Name, config.ToRulesMap()) sys.AddRulesMap(bucket.Name, config.ToRulesMap())
} }
return nil
} }
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets. // Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
@ -632,7 +632,7 @@ func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objA
} }
}() }()
go logger.LogIf(ctx, sys.load(buckets)) go sys.load(buckets)
return nil return nil
} }

Loading…
Cancel
Save