From a896125490f260a233de49a7c24880a9d905006d Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Fri, 4 Dec 2020 09:32:35 -0800 Subject: [PATCH] Add crawler delay config + dynamic config values (#11018) --- cmd/admin-handlers-config-kv.go | 19 +++- cmd/admin-handlers.go | 5 +- cmd/admin-heal-ops.go | 2 + cmd/common-main.go | 2 + cmd/config-current.go | 95 ++++++++++++++-- cmd/config/config.go | 52 ++++++--- cmd/config/crawler/crawler.go | 86 ++++++++++++++ cmd/data-crawler.go | 185 ++++++++++++++++++++++++------- cmd/data-usage.go | 1 - cmd/globals.go | 4 +- cmd/handler-api.go | 19 +++- cmd/object-api-utils.go | 5 +- cmd/peer-rest-server.go | 11 ++ cmd/server-main.go | 2 +- cmd/service.go | 5 +- cmd/xl-storage.go | 7 +- cmd/xl-storage_test.go | 2 +- pkg/env/env.go | 9 ++ pkg/madmin/config-kv-commands.go | 18 ++- 19 files changed, 440 insertions(+), 89 deletions(-) create mode 100644 cmd/config/crawler/crawler.go diff --git a/cmd/admin-handlers-config-kv.go b/cmd/admin-handlers-config-kv.go index 30eb8e5ab..bd15d58af 100644 --- a/cmd/admin-handlers-config-kv.go +++ b/cmd/admin-handlers-config-kv.go @@ -130,8 +130,8 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - - if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil { + dynamic, err := cfg.ReadConfig(bytes.NewReader(kvBytes)) + if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } @@ -158,6 +158,17 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete) } + // Apply dynamic values. + if err := applyDynamicConfig(GlobalContext, cfg); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + globalNotificationSys.SignalService(serviceReloadDynamic) + + // If all values were dynamic, tell the client. + if dynamic { + w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue) + } writeSuccessResponseHeadersOnly(w) } @@ -266,7 +277,7 @@ func (a adminAPIHandlers) RestoreConfigHistoryKVHandler(w http.ResponseWriter, r return } - if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil { + if _, err = cfg.ReadConfig(bytes.NewReader(kvBytes)); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } @@ -378,7 +389,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques } cfg := newServerConfig() - if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil { + if _, err = cfg.ReadConfig(bytes.NewReader(kvBytes)); err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 64f45e966..6b06b122d 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -205,9 +205,10 @@ func (a adminAPIHandlers) ServiceHandler(w http.ResponseWriter, r *http.Request) } var objectAPI ObjectLayer - if serviceSig == serviceRestart { + switch serviceSig { + case serviceRestart: objectAPI, _ = validateAdminReq(ctx, w, r, iampolicy.ServiceRestartAdminAction) - } else { + case serviceStop: objectAPI, _ = validateAdminReq(ctx, w, r, iampolicy.ServiceStopAdminAction) } if objectAPI == nil { diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 840758bb3..d2acaed18 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -658,7 +658,9 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) { } func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { + globalHealConfigMu.Lock() opts := globalHealConfig + globalHealConfigMu.Unlock() // Send heal request task := healTask{ diff --git a/cmd/common-main.go b/cmd/common-main.go index c08f45628..59e602317 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -64,10 +64,12 @@ func verifyObjectLayerFeatures(name string, objAPI ObjectLayer) { } } + globalCompressConfigMu.Lock() if globalCompressConfig.Enabled && !objAPI.IsCompressionSupported() { logger.Fatal(errInvalidArgument, "Compression support is requested but '%s' does not support compression", name) } + globalCompressConfigMu.Unlock() } // Check for updates and print a notification message diff --git a/cmd/config-current.go b/cmd/config-current.go index aa26bebe5..e2d303ddd 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -17,6 +17,7 @@ package cmd import ( + "context" "fmt" "strings" "sync" @@ -25,6 +26,7 @@ import ( "github.com/minio/minio/cmd/config/api" "github.com/minio/minio/cmd/config/cache" "github.com/minio/minio/cmd/config/compress" + "github.com/minio/minio/cmd/config/crawler" "github.com/minio/minio/cmd/config/dns" "github.com/minio/minio/cmd/config/etcd" "github.com/minio/minio/cmd/config/heal" @@ -57,6 +59,7 @@ func initHelp() { config.LoggerWebhookSubSys: logger.DefaultKVS, config.AuditWebhookSubSys: logger.DefaultAuditKVS, config.HealSubSys: heal.DefaultKVS, + config.CrawlerSubSys: crawler.DefaultKVS, } for k, v := range notify.DefaultNotificationKVS { kvs[k] = v @@ -112,6 +115,10 @@ func initHelp() { Key: config.HealSubSys, Description: "manage object healing frequency and bitrot verification checks", }, + config.HelpKV{ + Key: config.CrawlerSubSys, + Description: "manage crawling for usage calculation, lifecycle, healing and more", + }, config.HelpKV{ Key: config.LoggerWebhookSubSys, Description: "send server logs to webhook endpoints", @@ -192,6 +199,7 @@ func initHelp() { config.CacheSubSys: cache.Help, config.CompressionSubSys: compress.Help, config.HealSubSys: heal.Help, + config.CrawlerSubSys: crawler.Help, config.IdentityOpenIDSubSys: openid.Help, config.IdentityLDAPSubSys: xldap.Help, config.PolicyOPASubSys: opa.Help, @@ -221,6 +229,9 @@ var ( ) func validateConfig(s config.Config, setDriveCount int) error { + // We must have a global lock for this so nobody else modifies env while we do. + defer env.LockSetEnv()() + // Disable merging env values with config for validation. env.SetEnvOff() @@ -249,14 +260,25 @@ func validateConfig(s config.Config, setDriveCount int) error { return err } - if _, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]); err != nil { + compCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]) + if err != nil { return err } + objAPI := newObjectLayerFn() + if objAPI != nil { + if compCfg.Enabled && !objAPI.IsCompressionSupported() { + return fmt.Errorf("Backend does not support compression") + } + } if _, err := heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil { return err } + if _, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default]); err != nil { + return err + } + { etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs) if err != nil { @@ -438,10 +460,6 @@ func lookupConfigs(s config.Config, setDriveCount int) { } } } - globalHealConfig, err = heal.LookupConfig(s[config.HealSubSys][config.Default]) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to read heal config: %w", err)) - } kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport()) if err != nil { @@ -459,11 +477,6 @@ func lookupConfigs(s config.Config, setDriveCount int) { logger.LogIf(ctx, fmt.Errorf("%s env is deprecated please migrate to using `mc encrypt` at bucket level", crypto.EnvKMSAutoEncryption)) } - globalCompressConfig, err = compress.LookupConfig(s[config.CompressionSubSys][config.Default]) - if err != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to setup Compression: %w", err)) - } - globalOpenIDConfig, err = openid.LookupConfig(s[config.IdentityOpenIDSubSys][config.Default], NewGatewayHTTPTransport(), xhttp.DrainBody) if err != nil { @@ -538,6 +551,68 @@ func lookupConfigs(s config.Config, setDriveCount int) { if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err)) } + + // Apply dynamic config values + logger.LogIf(ctx, applyDynamicConfig(ctx, s)) +} + +// applyDynamicConfig will apply dynamic config values. +// Dynamic systems should be in config.SubSystemsDynamic as well. +func applyDynamicConfig(ctx context.Context, s config.Config) error { + // Read all dynamic configs. + // API + apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default]) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err)) + } + + // Compression + cmpCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]) + if err != nil { + return fmt.Errorf("Unable to setup Compression: %w", err) + } + objAPI := newObjectLayerFn() + if objAPI != nil { + if cmpCfg.Enabled && !objAPI.IsCompressionSupported() { + return fmt.Errorf("Backend does not support compression") + } + } + + // Heal + healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default]) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to apply heal config: %w", err)) + } + + // Crawler + crawlerCfg, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default]) + if err != nil { + return fmt.Errorf("Unable to apply crawler config: %w", err) + } + + // Apply configurations. + // We should not fail after this. + globalAPIConfig.init(apiConfig, globalAPIConfig.setDriveCount) + + globalCompressConfigMu.Lock() + globalCompressConfig = cmpCfg + globalCompressConfigMu.Unlock() + + globalHealConfigMu.Lock() + globalHealConfig = healCfg + globalHealConfigMu.Unlock() + + logger.LogIf(ctx, crawlerSleeper.Update(crawlerCfg.Delay, crawlerCfg.MaxWait)) + + // Update all dynamic config values in memory. + globalServerConfigMu.Lock() + defer globalServerConfigMu.Unlock() + if globalServerConfig != nil { + for k := range config.SubSystemsDynamic { + globalServerConfig[k] = s[k] + } + } + return nil } // Help - return sub-system level help diff --git a/cmd/config/config.go b/cmd/config/config.go index 1f3a877b2..7b43cc263 100644 --- a/cmd/config/config.go +++ b/cmd/config/config.go @@ -77,6 +77,7 @@ const ( LoggerWebhookSubSys = "logger_webhook" AuditWebhookSubSys = "audit_webhook" HealSubSys = "heal" + CrawlerSubSys = "crawler" // Add new constants here if you add new fields to config. ) @@ -98,7 +99,7 @@ const ( ) // SubSystems - all supported sub-systems -var SubSystems = set.CreateStringSet([]string{ +var SubSystems = set.CreateStringSet( CredentialsSubSys, RegionSubSys, EtcdSubSys, @@ -113,6 +114,7 @@ var SubSystems = set.CreateStringSet([]string{ PolicyOPASubSys, IdentityLDAPSubSys, IdentityOpenIDSubSys, + CrawlerSubSys, HealSubSys, NotifyAMQPSubSys, NotifyESSubSys, @@ -124,7 +126,15 @@ var SubSystems = set.CreateStringSet([]string{ NotifyPostgresSubSys, NotifyRedisSubSys, NotifyWebhookSubSys, -}...) +) + +// SubSystemsDynamic - all sub-systems that have dynamic config. +var SubSystemsDynamic = set.CreateStringSet( + APISubSys, + CompressionSubSys, + CrawlerSubSys, + HealSubSys, +) // SubSystemsSingleTargets - subsystems which only support single target. var SubSystemsSingleTargets = set.CreateStringSet([]string{ @@ -141,6 +151,7 @@ var SubSystemsSingleTargets = set.CreateStringSet([]string{ IdentityLDAPSubSys, IdentityOpenIDSubSys, HealSubSys, + CrawlerSubSys, }...) // Constant separators @@ -309,25 +320,29 @@ func (c Config) DelFrom(r io.Reader) error { return nil } -// ReadFrom - implements io.ReaderFrom interface -func (c Config) ReadFrom(r io.Reader) (int64, error) { +// ReadConfig - read content from input and write into c. +// Returns whether all parameters were dynamic. +func (c Config) ReadConfig(r io.Reader) (dynOnly bool, err error) { var n int scanner := bufio.NewScanner(r) + dynOnly = true for scanner.Scan() { // Skip any empty lines, or comment like characters text := scanner.Text() if text == "" || strings.HasPrefix(text, KvComment) { continue } - if err := c.SetKVS(text, DefaultKVS); err != nil { - return 0, err + dynamic, err := c.SetKVS(text, DefaultKVS) + if err != nil { + return false, err } + dynOnly = dynOnly && dynamic n += len(text) } if err := scanner.Err(); err != nil { - return 0, err + return false, err } - return int64(n), nil + return dynOnly, nil } type configWriteTo struct { @@ -618,26 +633,27 @@ func (c Config) Clone() Config { } // SetKVS - set specific key values per sub-system. -func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error { +func (c Config) SetKVS(s string, defaultKVS map[string]KVS) (dynamic bool, err error) { if len(s) == 0 { - return Errorf("input arguments cannot be empty") + return false, Errorf("input arguments cannot be empty") } inputs := strings.SplitN(s, KvSpaceSeparator, 2) if len(inputs) <= 1 { - return Errorf("invalid number of arguments '%s'", s) + return false, Errorf("invalid number of arguments '%s'", s) } subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2) if len(subSystemValue) == 0 { - return Errorf("invalid number of arguments %s", s) + return false, Errorf("invalid number of arguments %s", s) } if !SubSystems.Contains(subSystemValue[0]) { - return Errorf("unknown sub-system %s", s) + return false, Errorf("unknown sub-system %s", s) } if SubSystemsSingleTargets.Contains(subSystemValue[0]) && len(subSystemValue) == 2 { - return Errorf("sub-system '%s' only supports single target", subSystemValue[0]) + return false, Errorf("sub-system '%s' only supports single target", subSystemValue[0]) } + dynamic = SubSystemsDynamic.Contains(subSystemValue[0]) tgt := Default subSys := subSystemValue[0] @@ -647,7 +663,7 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error { fields := madmin.KvFields(inputs[1], defaultKVS[subSys].Keys()) if len(fields) == 0 { - return Errorf("sub-system '%s' cannot have empty keys", subSys) + return false, Errorf("sub-system '%s' cannot have empty keys", subSys) } var kvs = KVS{} @@ -670,7 +686,7 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error { kvs.Set(prevK, madmin.SanitizeValue(kv[1])) continue } - return Errorf("key '%s', cannot have empty value", kv[0]) + return false, Errorf("key '%s', cannot have empty value", kv[0]) } _, ok := kvs.Lookup(Enable) @@ -720,11 +736,11 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error { // Return error only if the // key is enabled, for state=off // let it be empty. - return Errorf( + return false, Errorf( "'%s' is not optional for '%s' sub-system, please check '%s' documentation", hkv.Key, subSys, subSys) } } c[subSys][tgt] = currKVS - return nil + return dynamic, nil } diff --git a/cmd/config/crawler/crawler.go b/cmd/config/crawler/crawler.go new file mode 100644 index 000000000..cc1386b42 --- /dev/null +++ b/cmd/config/crawler/crawler.go @@ -0,0 +1,86 @@ +/* + * MinIO Cloud Storage, (C) 2020 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package crawler + +import ( + "strconv" + "time" + + "github.com/minio/minio/cmd/config" +) + +// Compression environment variables +const ( + Delay = "delay" + MaxWait = "max_wait" +) + +// Config represents the heal settings. +type Config struct { + // Delay is the sleep multiplier. + Delay float64 `json:"delay"` + // MaxWait is maximum wait time between operations + MaxWait time.Duration +} + +var ( + // DefaultKVS - default KV config for heal settings + DefaultKVS = config.KVS{ + config.KV{ + Key: Delay, + Value: "10", + }, + config.KV{ + Key: MaxWait, + Value: "15s", + }, + } + + // Help provides help for config values + Help = config.HelpKVS{ + config.HelpKV{ + Key: Delay, + Description: `crawler delay multiplier, default 10`, + Optional: true, + Type: "float", + }, + config.HelpKV{ + Key: MaxWait, + Description: `maximum wait time between operations, default 15s`, + Optional: true, + Type: "duration", + }, + } +) + +// LookupConfig - lookup config and override with valid environment settings if any. +func LookupConfig(kvs config.KVS) (cfg Config, err error) { + if err = config.CheckValidKeys(config.CrawlerSubSys, kvs, DefaultKVS); err != nil { + return cfg, err + } + delay := kvs.Get(Delay) + cfg.Delay, err = strconv.ParseFloat(delay, 64) + if err != nil { + return cfg, err + } + wait := kvs.Get(MaxWait) + cfg.MaxWait, err = time.ParseDuration(wait) + if err != nil { + return cfg, err + } + return cfg, nil +} diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 74a64ac0a..9092b1573 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -21,11 +21,12 @@ import ( "context" "encoding/binary" "errors" + "math" "math/rand" "os" "path" - "strconv" "strings" + "sync" "time" "github.com/minio/minio/cmd/config" @@ -43,7 +44,6 @@ import ( const ( dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders. - dataCrawlSleepDefMult = 10.0 // Default multiplier for waits between operations. dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles. dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles. @@ -53,8 +53,12 @@ const ( ) var ( - globalHealConfig heal.Config + globalHealConfig heal.Config + globalHealConfigMu sync.Mutex + dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) + // Sleeper values are updated when config is loaded. + crawlerSleeper = newDynamicSleeper(10, 10*time.Second) ) // initDataCrawler will start the crawler unless disabled. @@ -141,7 +145,6 @@ type folderScanner struct { newCache dataUsageCache withFilter *bloomFilter - dataUsageCrawlMult float64 dataUsageCrawlDebug bool healFolderInclude uint32 // Include a clean folder one in n cycles. healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude @@ -172,11 +175,6 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, return cache, errors.New("internal error: root scan attempted") } - delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64) - if err != nil { - logger.LogIf(ctx, err) - delayMult = dataCrawlSleepDefMult - } s := folderScanner{ root: basePath, getSize: getSize, @@ -184,7 +182,6 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, newCache: dataUsageCache{Info: cache.Info}, newFolders: nil, existingFolders: nil, - dataUsageCrawlMult: delayMult, dataUsageCrawlDebug: intDataUpdateTracker.debug, healFolderInclude: 0, healObjectSelect: 0, @@ -386,7 +383,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } } } - sleepDuration(dataCrawlSleepPerFolder, f.dataUsageCrawlMult) + crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder) cache := dataUsageEntry{} @@ -435,7 +432,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } // Dynamic time delay. - t := UTCNow() + wait := crawlerSleeper.Timer(ctx) // Get file size, ignore errors. item := crawlItem{ @@ -450,7 +447,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } size, err := f.getSize(item) - sleepDuration(time.Since(t), f.dataUsageCrawlMult) + wait() if err == errSkipFile { return nil } @@ -506,8 +503,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } // Dynamic time delay. - t := UTCNow() - + wait := crawlerSleeper.Timer(ctx) resolver.bucket = bucket foundObjs := false @@ -535,8 +531,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo logger.Info(color.Green("healObjects:")+" got partial, %d agreed, errs: %v", nAgreed, errs) } // Sleep and reset. - sleepDuration(time.Since(t), f.dataUsageCrawlMult) - t = UTCNow() + wait() + wait = crawlerSleeper.Timer(ctx) entry, ok := entries.resolve(&resolver) if !ok { for _, err := range errs { @@ -572,8 +568,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo } for _, ver := range fiv.Versions { // Sleep and reset. - sleepDuration(time.Since(t), f.dataUsageCrawlMult) - t = UTCNow() + wait() + wait = crawlerSleeper.Timer(ctx) err := bgSeq.queueHealTask(healSource{ bucket: bucket, object: fiv.Name, @@ -605,6 +601,9 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo // If we have quorum, found directories, but no objects, issue heal to delete the dangling. objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: true}, func(bucket, object, versionID string) error { + // Wait for each heal as per crawler frequency. + wait() + wait = crawlerSleeper.Timer(ctx) return bgSeq.queueHealTask(healSource{ bucket: bucket, object: object, @@ -613,7 +612,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo }) } - sleepDuration(time.Since(t), f.dataUsageCrawlMult) + wait() // Add unless healing returned an error. if foundObjs { @@ -651,12 +650,12 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) dirStack = append(dirStack, entName) err := readDirFn(path.Join(dirStack...), addDir) dirStack = dirStack[:len(dirStack)-1] - sleepDuration(dataCrawlSleepPerFolder, f.dataUsageCrawlMult) + crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder) return err } // Dynamic time delay. - t := UTCNow() + wait := crawlerSleeper.Timer(ctx) // Get file size, ignore errors. dirStack = append(dirStack, entName) @@ -686,8 +685,8 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv), }) - // Don't sleep for really small amount of time - sleepDuration(time.Since(t), f.dataUsageCrawlMult) + // Wait to throttle IO + wait() if err == errSkipFile { return nil @@ -910,21 +909,6 @@ func (i *crawlItem) objectPath() string { return path.Join(i.prefix, i.objectName) } -// sleepDuration multiplies the duration d by x -// and sleeps if is more than 100 micro seconds. -// Sleep is limited to max 15 seconds. -func sleepDuration(d time.Duration, x float64) { - const maxWait = 15 * time.Second - const minWait = 100 * time.Microsecond - // Don't sleep for really small amount of time - if d := time.Duration(float64(d) * x); d > minWait { - if d > maxWait { - d = maxWait - } - time.Sleep(d) - } -} - // healReplication will heal a scanned item that has failed replication. func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta) { if meta.oi.DeleteMarker || !meta.oi.VersionPurgeStatus.Empty() { @@ -967,3 +951,126 @@ func (i *crawlItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, m }) } } + +type dynamicSleeper struct { + mu sync.RWMutex + + // Sleep factor + factor float64 + + // maximum sleep cap, + // set to <= 0 to disable. + maxSleep time.Duration + + // Don't sleep at all, if time taken is below this value. + // This is to avoid too small costly sleeps. + minSleep time.Duration + + // cycle will be closed + cycle chan struct{} +} + +// newDynamicSleeper +func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper { + return &dynamicSleeper{ + factor: factor, + cycle: make(chan struct{}), + maxSleep: maxWait, + minSleep: 100 * time.Microsecond, + } +} + +// Timer returns a timer that has started. +// When the returned function is called it will wait. +func (d *dynamicSleeper) Timer(ctx context.Context) func() { + t := time.Now() + return func() { + doneAt := time.Now() + for { + // Grab current values + d.mu.RLock() + minWait, maxWait := d.minSleep, d.maxSleep + factor := d.factor + cycle := d.cycle + d.mu.RUnlock() + elapsed := doneAt.Sub(t) + // Don't sleep for really small amount of time + wantSleep := time.Duration(float64(elapsed) * factor) + if wantSleep <= minWait { + return + } + if maxWait > 0 && wantSleep > maxWait { + wantSleep = maxWait + } + timer := time.NewTimer(wantSleep) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + return + case <-cycle: + if !timer.Stop() { + // We expired. + <-timer.C + return + } + } + } + } +} + +// Sleep sleeps the specified time multiplied by the sleep factor. +// If the factor is updated the sleep will be done again with the new factor. +func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) { + for { + // Grab current values + d.mu.RLock() + minWait, maxWait := d.minSleep, d.maxSleep + factor := d.factor + cycle := d.cycle + d.mu.RUnlock() + // Don't sleep for really small amount of time + wantSleep := time.Duration(float64(base) * factor) + if wantSleep <= minWait { + return + } + if maxWait > 0 && wantSleep > maxWait { + wantSleep = maxWait + } + timer := time.NewTimer(wantSleep) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + return + case <-cycle: + if !timer.Stop() { + // We expired. + <-timer.C + return + } + } + } +} + +// Update the current settings and cycle all waiting. +// Parameters are the same as in the contructor. +func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error { + d.mu.Lock() + defer d.mu.Unlock() + if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait { + return nil + } + // Update values and cycle waiting. + close(d.cycle) + d.factor = factor + d.maxSleep = maxWait + d.cycle = make(chan struct{}) + return nil +} diff --git a/cmd/data-usage.go b/cmd/data-usage.go index bcec9acb9..7387759ee 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -28,7 +28,6 @@ import ( const ( envDataUsageCrawlConf = "MINIO_DISK_USAGE_CRAWL_ENABLE" - envDataUsageCrawlDelay = "MINIO_DISK_USAGE_CRAWL_DELAY" envDataUsageCrawlDebug = "MINIO_DISK_USAGE_CRAWL_DEBUG" dataUsageRoot = SlashSeparator diff --git a/cmd/globals.go b/cmd/globals.go index 1f87f46e0..a2cbfc5c7 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -20,6 +20,7 @@ import ( "crypto/x509" "net/http" "os" + "sync" "time" "github.com/minio/minio-go/v7/pkg/set" @@ -245,7 +246,8 @@ var ( globalAutoEncryption bool // Is compression enabled? - globalCompressConfig compress.Config + globalCompressConfigMu sync.Mutex + globalCompressConfig compress.Config // Some standard object extensions which we strictly dis-allow for compression. standardExcludeCompressExtensions = []string{".gz", ".bz2", ".rar", ".zip", ".7z", ".xz", ".mp4", ".mkv", ".mov"} diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 8a97e5023..849ea2986 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -35,6 +35,7 @@ type apiConfig struct { listQuorum int extendListLife time.Duration corsAllowOrigins []string + setDriveCount int } func (t *apiConfig) init(cfg api.Config, setDriveCount int) { @@ -43,6 +44,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) { t.clusterDeadline = cfg.ClusterDeadline t.corsAllowOrigins = cfg.CorsAllowOrigin + t.setDriveCount = setDriveCount var apiRequestsMaxPerNode int if cfg.RequestsMax <= 0 { @@ -62,8 +64,14 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) { apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames()) } } - - t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode) + if cap(t.requestsPool) < apiRequestsMaxPerNode { + // Only replace if needed. + // Existing requests will use the previous limit, + // but new requests will use the new limit. + // There will be a short overlap window, + // but this shouldn't last long. + t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode) + } t.requestsDeadline = cfg.RequestsDeadline t.listQuorum = cfg.GetListQuorum() t.extendListLife = cfg.ExtendListLife @@ -76,6 +84,13 @@ func (t *apiConfig) getListQuorum() int { return t.listQuorum } +func (t *apiConfig) getSetDriveCount() int { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.setDriveCount +} + func (t *apiConfig) getExtendListLife() time.Duration { t.mu.RLock() defer t.mu.RUnlock() diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index c7b889aeb..e892221cd 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -436,7 +436,10 @@ func (o ObjectInfo) GetActualSize() (int64, error) { // Using compression and encryption together enables room for side channel attacks. // Eliminate non-compressible objects by extensions/content-types. func isCompressible(header http.Header, object string) bool { - if crypto.IsRequested(header) || excludeForCompression(header, object, globalCompressConfig) { + globalCompressConfigMu.Lock() + cfg := globalCompressConfig + globalCompressConfigMu.Unlock() + if !cfg.Enabled || crypto.IsRequested(header) || excludeForCompression(header, object, cfg) { return false } return true diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index ac2b2c5bf..ea21774cd 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -786,6 +786,17 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req globalServiceSignalCh <- signal case serviceStop: globalServiceSignalCh <- signal + case serviceReloadDynamic: + srvCfg, err := getValidConfig(newObjectLayerFn()) + if err != nil { + s.writeErrorResponse(w, err) + return + } + err = applyDynamicConfig(r.Context(), srvCfg) + if err != nil { + s.writeErrorResponse(w, err) + } + return default: s.writeErrorResponse(w, errUnsupportedSignal) return diff --git a/cmd/server-main.go b/cmd/server-main.go index 3dca8c39c..7d882c8e3 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -475,7 +475,7 @@ func serverMain(ctx *cli.Context) { logger.SetDeploymentID(globalDeploymentID) - go initDataCrawler(GlobalContext, newObject) + initDataCrawler(GlobalContext, newObject) // Enable background operations for erasure coding if globalIsErasure { diff --git a/cmd/service.go b/cmd/service.go index 8f10eae16..4bf3e6a8c 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -27,8 +27,9 @@ import ( type serviceSignal int const ( - serviceRestart serviceSignal = iota // Restarts the server. - serviceStop // Stops the server. + serviceRestart serviceSignal = iota // Restarts the server. + serviceStop // Stops the server. + serviceReloadDynamic // Reload dynamic config values. // Add new service requests here. ) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 1f828b22d..95c0fb322 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -336,7 +336,10 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac if objAPI == nil { return cache, errServerNotInitialized } - opts := globalHealConfig + + globalHealConfigMu.Lock() + healOpts := globalHealConfig + globalHealConfigMu.Unlock() dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, func(item crawlItem) (int64, error) { // Look for `xl.meta/xl.json' at the leaf. @@ -375,7 +378,7 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac }) if !version.Deleted { // Bitrot check local data - if size > 0 && item.heal && opts.Bitrot { + if size > 0 && item.heal && healOpts.Bitrot { // HealObject verifies bitrot requirement internally res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{ Remove: healDeleteDangling, diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index 29a129122..5abafe5cc 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -1693,7 +1693,7 @@ func TestXLStorageVerifyFile(t *testing.T) { reader := bytes.NewReader(data) for { // Using io.CopyBuffer instead of this loop will not work for us as io.CopyBuffer - // will use bytes.Buffer.ReadFrom() which will not do shardSize'ed writes causing error. + // will use bytes.Buffer.ReadConfig() which will not do shardSize'ed writes causing error. n, err := reader.Read(shard) w.Write(shard[:n]) if err == nil { diff --git a/pkg/env/env.go b/pkg/env/env.go index 820a4248a..6210dab53 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -24,10 +24,19 @@ import ( var ( privateMutex sync.RWMutex + lockEnvMutex sync.Mutex envOff bool ) +// LockSetEnv locks modifications to environment. +// Call returned function to unlock. +func LockSetEnv() func() { + lockEnvMutex.Lock() + return lockEnvMutex.Unlock +} + // SetEnvOff - turns off env lookup +// A global lock above this MUST ensure that func SetEnvOff() { privateMutex.Lock() defer privateMutex.Unlock() diff --git a/pkg/madmin/config-kv-commands.go b/pkg/madmin/config-kv-commands.go index 202b148d4..403fa12d1 100644 --- a/pkg/madmin/config-kv-commands.go +++ b/pkg/madmin/config-kv-commands.go @@ -50,11 +50,19 @@ func (adm *AdminClient) DelConfigKV(ctx context.Context, k string) (err error) { return nil } +const ( + // ConfigAppliedHeader is the header indicating whether the config was applied without requiring a restart. + ConfigAppliedHeader = "x-minio-config-applied" + + // ConfigAppliedTrue is the value set in header if the config was applied. + ConfigAppliedTrue = "true" +) + // SetConfigKV - set key value config to server. -func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (err error) { +func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (restart bool, err error) { econfigBytes, err := EncryptData(adm.getSecretKey(), []byte(kv)) if err != nil { - return err + return false, err } reqData := requestData{ @@ -67,14 +75,14 @@ func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (err error) defer closeResponse(resp) if err != nil { - return err + return false, err } if resp.StatusCode != http.StatusOK { - return httpRespToErrorResponse(resp) + return false, httpRespToErrorResponse(resp) } - return nil + return resp.Header.Get(ConfigAppliedHeader) != ConfigAppliedTrue, nil } // GetConfigKV - returns the key, value of the requested key, incoming data is encrypted.