Add crawler delay config + dynamic config values (#11018)

master
Klaus Post 4 years ago committed by GitHub
parent e083471ec4
commit a896125490
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      cmd/admin-handlers-config-kv.go
  2. 5
      cmd/admin-handlers.go
  3. 2
      cmd/admin-heal-ops.go
  4. 2
      cmd/common-main.go
  5. 95
      cmd/config-current.go
  6. 52
      cmd/config/config.go
  7. 86
      cmd/config/crawler/crawler.go
  8. 185
      cmd/data-crawler.go
  9. 1
      cmd/data-usage.go
  10. 4
      cmd/globals.go
  11. 19
      cmd/handler-api.go
  12. 5
      cmd/object-api-utils.go
  13. 11
      cmd/peer-rest-server.go
  14. 2
      cmd/server-main.go
  15. 5
      cmd/service.go
  16. 7
      cmd/xl-storage.go
  17. 2
      cmd/xl-storage_test.go
  18. 9
      pkg/env/env.go
  19. 18
      pkg/madmin/config-kv-commands.go

@ -130,8 +130,8 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil {
dynamic, err := cfg.ReadConfig(bytes.NewReader(kvBytes))
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
@ -158,6 +158,17 @@ func (a adminAPIHandlers) SetConfigKVHandler(w http.ResponseWriter, r *http.Requ
saveConfig(GlobalContext, objectAPI, backendEncryptedFile, backendEncryptedMigrationComplete)
}
// Apply dynamic values.
if err := applyDynamicConfig(GlobalContext, cfg); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
globalNotificationSys.SignalService(serviceReloadDynamic)
// If all values were dynamic, tell the client.
if dynamic {
w.Header().Set(madmin.ConfigAppliedHeader, madmin.ConfigAppliedTrue)
}
writeSuccessResponseHeadersOnly(w)
}
@ -266,7 +277,7 @@ func (a adminAPIHandlers) RestoreConfigHistoryKVHandler(w http.ResponseWriter, r
return
}
if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil {
if _, err = cfg.ReadConfig(bytes.NewReader(kvBytes)); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
@ -378,7 +389,7 @@ func (a adminAPIHandlers) SetConfigHandler(w http.ResponseWriter, r *http.Reques
}
cfg := newServerConfig()
if _, err = cfg.ReadFrom(bytes.NewReader(kvBytes)); err != nil {
if _, err = cfg.ReadConfig(bytes.NewReader(kvBytes)); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}

@ -205,9 +205,10 @@ func (a adminAPIHandlers) ServiceHandler(w http.ResponseWriter, r *http.Request)
}
var objectAPI ObjectLayer
if serviceSig == serviceRestart {
switch serviceSig {
case serviceRestart:
objectAPI, _ = validateAdminReq(ctx, w, r, iampolicy.ServiceRestartAdminAction)
} else {
case serviceStop:
objectAPI, _ = validateAdminReq(ctx, w, r, iampolicy.ServiceStopAdminAction)
}
if objectAPI == nil {

@ -658,7 +658,9 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) {
}
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
globalHealConfigMu.Lock()
opts := globalHealConfig
globalHealConfigMu.Unlock()
// Send heal request
task := healTask{

@ -64,10 +64,12 @@ func verifyObjectLayerFeatures(name string, objAPI ObjectLayer) {
}
}
globalCompressConfigMu.Lock()
if globalCompressConfig.Enabled && !objAPI.IsCompressionSupported() {
logger.Fatal(errInvalidArgument,
"Compression support is requested but '%s' does not support compression", name)
}
globalCompressConfigMu.Unlock()
}
// Check for updates and print a notification message

@ -17,6 +17,7 @@
package cmd
import (
"context"
"fmt"
"strings"
"sync"
@ -25,6 +26,7 @@ import (
"github.com/minio/minio/cmd/config/api"
"github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/config/compress"
"github.com/minio/minio/cmd/config/crawler"
"github.com/minio/minio/cmd/config/dns"
"github.com/minio/minio/cmd/config/etcd"
"github.com/minio/minio/cmd/config/heal"
@ -57,6 +59,7 @@ func initHelp() {
config.LoggerWebhookSubSys: logger.DefaultKVS,
config.AuditWebhookSubSys: logger.DefaultAuditKVS,
config.HealSubSys: heal.DefaultKVS,
config.CrawlerSubSys: crawler.DefaultKVS,
}
for k, v := range notify.DefaultNotificationKVS {
kvs[k] = v
@ -112,6 +115,10 @@ func initHelp() {
Key: config.HealSubSys,
Description: "manage object healing frequency and bitrot verification checks",
},
config.HelpKV{
Key: config.CrawlerSubSys,
Description: "manage crawling for usage calculation, lifecycle, healing and more",
},
config.HelpKV{
Key: config.LoggerWebhookSubSys,
Description: "send server logs to webhook endpoints",
@ -192,6 +199,7 @@ func initHelp() {
config.CacheSubSys: cache.Help,
config.CompressionSubSys: compress.Help,
config.HealSubSys: heal.Help,
config.CrawlerSubSys: crawler.Help,
config.IdentityOpenIDSubSys: openid.Help,
config.IdentityLDAPSubSys: xldap.Help,
config.PolicyOPASubSys: opa.Help,
@ -221,6 +229,9 @@ var (
)
func validateConfig(s config.Config, setDriveCount int) error {
// We must have a global lock for this so nobody else modifies env while we do.
defer env.LockSetEnv()()
// Disable merging env values with config for validation.
env.SetEnvOff()
@ -249,14 +260,25 @@ func validateConfig(s config.Config, setDriveCount int) error {
return err
}
if _, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default]); err != nil {
compCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
if err != nil {
return err
}
objAPI := newObjectLayerFn()
if objAPI != nil {
if compCfg.Enabled && !objAPI.IsCompressionSupported() {
return fmt.Errorf("Backend does not support compression")
}
}
if _, err := heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil {
return err
}
if _, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default]); err != nil {
return err
}
{
etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
if err != nil {
@ -438,10 +460,6 @@ func lookupConfigs(s config.Config, setDriveCount int) {
}
}
}
globalHealConfig, err = heal.LookupConfig(s[config.HealSubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to read heal config: %w", err))
}
kmsCfg, err := crypto.LookupConfig(s, globalCertsCADir.Get(), NewGatewayHTTPTransport())
if err != nil {
@ -459,11 +477,6 @@ func lookupConfigs(s config.Config, setDriveCount int) {
logger.LogIf(ctx, fmt.Errorf("%s env is deprecated please migrate to using `mc encrypt` at bucket level", crypto.EnvKMSAutoEncryption))
}
globalCompressConfig, err = compress.LookupConfig(s[config.CompressionSubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to setup Compression: %w", err))
}
globalOpenIDConfig, err = openid.LookupConfig(s[config.IdentityOpenIDSubSys][config.Default],
NewGatewayHTTPTransport(), xhttp.DrainBody)
if err != nil {
@ -538,6 +551,68 @@ func lookupConfigs(s config.Config, setDriveCount int) {
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
}
// Apply dynamic config values
logger.LogIf(ctx, applyDynamicConfig(ctx, s))
}
// applyDynamicConfig will apply dynamic config values.
// Dynamic systems should be in config.SubSystemsDynamic as well.
func applyDynamicConfig(ctx context.Context, s config.Config) error {
// Read all dynamic configs.
// API
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
}
// Compression
cmpCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
if err != nil {
return fmt.Errorf("Unable to setup Compression: %w", err)
}
objAPI := newObjectLayerFn()
if objAPI != nil {
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
return fmt.Errorf("Backend does not support compression")
}
}
// Heal
healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to apply heal config: %w", err))
}
// Crawler
crawlerCfg, err := crawler.LookupConfig(s[config.CrawlerSubSys][config.Default])
if err != nil {
return fmt.Errorf("Unable to apply crawler config: %w", err)
}
// Apply configurations.
// We should not fail after this.
globalAPIConfig.init(apiConfig, globalAPIConfig.setDriveCount)
globalCompressConfigMu.Lock()
globalCompressConfig = cmpCfg
globalCompressConfigMu.Unlock()
globalHealConfigMu.Lock()
globalHealConfig = healCfg
globalHealConfigMu.Unlock()
logger.LogIf(ctx, crawlerSleeper.Update(crawlerCfg.Delay, crawlerCfg.MaxWait))
// Update all dynamic config values in memory.
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()
if globalServerConfig != nil {
for k := range config.SubSystemsDynamic {
globalServerConfig[k] = s[k]
}
}
return nil
}
// Help - return sub-system level help

@ -77,6 +77,7 @@ const (
LoggerWebhookSubSys = "logger_webhook"
AuditWebhookSubSys = "audit_webhook"
HealSubSys = "heal"
CrawlerSubSys = "crawler"
// Add new constants here if you add new fields to config.
)
@ -98,7 +99,7 @@ const (
)
// SubSystems - all supported sub-systems
var SubSystems = set.CreateStringSet([]string{
var SubSystems = set.CreateStringSet(
CredentialsSubSys,
RegionSubSys,
EtcdSubSys,
@ -113,6 +114,7 @@ var SubSystems = set.CreateStringSet([]string{
PolicyOPASubSys,
IdentityLDAPSubSys,
IdentityOpenIDSubSys,
CrawlerSubSys,
HealSubSys,
NotifyAMQPSubSys,
NotifyESSubSys,
@ -124,7 +126,15 @@ var SubSystems = set.CreateStringSet([]string{
NotifyPostgresSubSys,
NotifyRedisSubSys,
NotifyWebhookSubSys,
}...)
)
// SubSystemsDynamic - all sub-systems that have dynamic config.
var SubSystemsDynamic = set.CreateStringSet(
APISubSys,
CompressionSubSys,
CrawlerSubSys,
HealSubSys,
)
// SubSystemsSingleTargets - subsystems which only support single target.
var SubSystemsSingleTargets = set.CreateStringSet([]string{
@ -141,6 +151,7 @@ var SubSystemsSingleTargets = set.CreateStringSet([]string{
IdentityLDAPSubSys,
IdentityOpenIDSubSys,
HealSubSys,
CrawlerSubSys,
}...)
// Constant separators
@ -309,25 +320,29 @@ func (c Config) DelFrom(r io.Reader) error {
return nil
}
// ReadFrom - implements io.ReaderFrom interface
func (c Config) ReadFrom(r io.Reader) (int64, error) {
// ReadConfig - read content from input and write into c.
// Returns whether all parameters were dynamic.
func (c Config) ReadConfig(r io.Reader) (dynOnly bool, err error) {
var n int
scanner := bufio.NewScanner(r)
dynOnly = true
for scanner.Scan() {
// Skip any empty lines, or comment like characters
text := scanner.Text()
if text == "" || strings.HasPrefix(text, KvComment) {
continue
}
if err := c.SetKVS(text, DefaultKVS); err != nil {
return 0, err
dynamic, err := c.SetKVS(text, DefaultKVS)
if err != nil {
return false, err
}
dynOnly = dynOnly && dynamic
n += len(text)
}
if err := scanner.Err(); err != nil {
return 0, err
return false, err
}
return int64(n), nil
return dynOnly, nil
}
type configWriteTo struct {
@ -618,26 +633,27 @@ func (c Config) Clone() Config {
}
// SetKVS - set specific key values per sub-system.
func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
func (c Config) SetKVS(s string, defaultKVS map[string]KVS) (dynamic bool, err error) {
if len(s) == 0 {
return Errorf("input arguments cannot be empty")
return false, Errorf("input arguments cannot be empty")
}
inputs := strings.SplitN(s, KvSpaceSeparator, 2)
if len(inputs) <= 1 {
return Errorf("invalid number of arguments '%s'", s)
return false, Errorf("invalid number of arguments '%s'", s)
}
subSystemValue := strings.SplitN(inputs[0], SubSystemSeparator, 2)
if len(subSystemValue) == 0 {
return Errorf("invalid number of arguments %s", s)
return false, Errorf("invalid number of arguments %s", s)
}
if !SubSystems.Contains(subSystemValue[0]) {
return Errorf("unknown sub-system %s", s)
return false, Errorf("unknown sub-system %s", s)
}
if SubSystemsSingleTargets.Contains(subSystemValue[0]) && len(subSystemValue) == 2 {
return Errorf("sub-system '%s' only supports single target", subSystemValue[0])
return false, Errorf("sub-system '%s' only supports single target", subSystemValue[0])
}
dynamic = SubSystemsDynamic.Contains(subSystemValue[0])
tgt := Default
subSys := subSystemValue[0]
@ -647,7 +663,7 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
fields := madmin.KvFields(inputs[1], defaultKVS[subSys].Keys())
if len(fields) == 0 {
return Errorf("sub-system '%s' cannot have empty keys", subSys)
return false, Errorf("sub-system '%s' cannot have empty keys", subSys)
}
var kvs = KVS{}
@ -670,7 +686,7 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
kvs.Set(prevK, madmin.SanitizeValue(kv[1]))
continue
}
return Errorf("key '%s', cannot have empty value", kv[0])
return false, Errorf("key '%s', cannot have empty value", kv[0])
}
_, ok := kvs.Lookup(Enable)
@ -720,11 +736,11 @@ func (c Config) SetKVS(s string, defaultKVS map[string]KVS) error {
// Return error only if the
// key is enabled, for state=off
// let it be empty.
return Errorf(
return false, Errorf(
"'%s' is not optional for '%s' sub-system, please check '%s' documentation",
hkv.Key, subSys, subSys)
}
}
c[subSys][tgt] = currKVS
return nil
return dynamic, nil
}

@ -0,0 +1,86 @@
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package crawler
import (
"strconv"
"time"
"github.com/minio/minio/cmd/config"
)
// Compression environment variables
const (
Delay = "delay"
MaxWait = "max_wait"
)
// Config represents the heal settings.
type Config struct {
// Delay is the sleep multiplier.
Delay float64 `json:"delay"`
// MaxWait is maximum wait time between operations
MaxWait time.Duration
}
var (
// DefaultKVS - default KV config for heal settings
DefaultKVS = config.KVS{
config.KV{
Key: Delay,
Value: "10",
},
config.KV{
Key: MaxWait,
Value: "15s",
},
}
// Help provides help for config values
Help = config.HelpKVS{
config.HelpKV{
Key: Delay,
Description: `crawler delay multiplier, default 10`,
Optional: true,
Type: "float",
},
config.HelpKV{
Key: MaxWait,
Description: `maximum wait time between operations, default 15s`,
Optional: true,
Type: "duration",
},
}
)
// LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.CrawlerSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
delay := kvs.Get(Delay)
cfg.Delay, err = strconv.ParseFloat(delay, 64)
if err != nil {
return cfg, err
}
wait := kvs.Get(MaxWait)
cfg.MaxWait, err = time.ParseDuration(wait)
if err != nil {
return cfg, err
}
return cfg, nil
}

@ -21,11 +21,12 @@ import (
"context"
"encoding/binary"
"errors"
"math"
"math/rand"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/minio/minio/cmd/config"
@ -43,7 +44,6 @@ import (
const (
dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders.
dataCrawlSleepDefMult = 10.0 // Default multiplier for waits between operations.
dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles.
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
@ -53,8 +53,12 @@ const (
)
var (
globalHealConfig heal.Config
globalHealConfig heal.Config
globalHealConfigMu sync.Mutex
dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
// Sleeper values are updated when config is loaded.
crawlerSleeper = newDynamicSleeper(10, 10*time.Second)
)
// initDataCrawler will start the crawler unless disabled.
@ -141,7 +145,6 @@ type folderScanner struct {
newCache dataUsageCache
withFilter *bloomFilter
dataUsageCrawlMult float64
dataUsageCrawlDebug bool
healFolderInclude uint32 // Include a clean folder one in n cycles.
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
@ -172,11 +175,6 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
return cache, errors.New("internal error: root scan attempted")
}
delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64)
if err != nil {
logger.LogIf(ctx, err)
delayMult = dataCrawlSleepDefMult
}
s := folderScanner{
root: basePath,
getSize: getSize,
@ -184,7 +182,6 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
newCache: dataUsageCache{Info: cache.Info},
newFolders: nil,
existingFolders: nil,
dataUsageCrawlMult: delayMult,
dataUsageCrawlDebug: intDataUpdateTracker.debug,
healFolderInclude: 0,
healObjectSelect: 0,
@ -386,7 +383,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
}
}
}
sleepDuration(dataCrawlSleepPerFolder, f.dataUsageCrawlMult)
crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
cache := dataUsageEntry{}
@ -435,7 +432,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
}
// Dynamic time delay.
t := UTCNow()
wait := crawlerSleeper.Timer(ctx)
// Get file size, ignore errors.
item := crawlItem{
@ -450,7 +447,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
}
size, err := f.getSize(item)
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
wait()
if err == errSkipFile {
return nil
}
@ -506,8 +503,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
}
// Dynamic time delay.
t := UTCNow()
wait := crawlerSleeper.Timer(ctx)
resolver.bucket = bucket
foundObjs := false
@ -535,8 +531,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
logger.Info(color.Green("healObjects:")+" got partial, %d agreed, errs: %v", nAgreed, errs)
}
// Sleep and reset.
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
t = UTCNow()
wait()
wait = crawlerSleeper.Timer(ctx)
entry, ok := entries.resolve(&resolver)
if !ok {
for _, err := range errs {
@ -572,8 +568,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
}
for _, ver := range fiv.Versions {
// Sleep and reset.
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
t = UTCNow()
wait()
wait = crawlerSleeper.Timer(ctx)
err := bgSeq.queueHealTask(healSource{
bucket: bucket,
object: fiv.Name,
@ -605,6 +601,9 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
// If we have quorum, found directories, but no objects, issue heal to delete the dangling.
objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: true},
func(bucket, object, versionID string) error {
// Wait for each heal as per crawler frequency.
wait()
wait = crawlerSleeper.Timer(ctx)
return bgSeq.queueHealTask(healSource{
bucket: bucket,
object: object,
@ -613,7 +612,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
})
}
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
wait()
// Add unless healing returned an error.
if foundObjs {
@ -651,12 +650,12 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder)
dirStack = append(dirStack, entName)
err := readDirFn(path.Join(dirStack...), addDir)
dirStack = dirStack[:len(dirStack)-1]
sleepDuration(dataCrawlSleepPerFolder, f.dataUsageCrawlMult)
crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
return err
}
// Dynamic time delay.
t := UTCNow()
wait := crawlerSleeper.Timer(ctx)
// Get file size, ignore errors.
dirStack = append(dirStack, entName)
@ -686,8 +685,8 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder)
heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv),
})
// Don't sleep for really small amount of time
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
// Wait to throttle IO
wait()
if err == errSkipFile {
return nil
@ -910,21 +909,6 @@ func (i *crawlItem) objectPath() string {
return path.Join(i.prefix, i.objectName)
}
// sleepDuration multiplies the duration d by x
// and sleeps if is more than 100 micro seconds.
// Sleep is limited to max 15 seconds.
func sleepDuration(d time.Duration, x float64) {
const maxWait = 15 * time.Second
const minWait = 100 * time.Microsecond
// Don't sleep for really small amount of time
if d := time.Duration(float64(d) * x); d > minWait {
if d > maxWait {
d = maxWait
}
time.Sleep(d)
}
}
// healReplication will heal a scanned item that has failed replication.
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, meta actionMeta) {
if meta.oi.DeleteMarker || !meta.oi.VersionPurgeStatus.Empty() {
@ -967,3 +951,126 @@ func (i *crawlItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, m
})
}
}
type dynamicSleeper struct {
mu sync.RWMutex
// Sleep factor
factor float64
// maximum sleep cap,
// set to <= 0 to disable.
maxSleep time.Duration
// Don't sleep at all, if time taken is below this value.
// This is to avoid too small costly sleeps.
minSleep time.Duration
// cycle will be closed
cycle chan struct{}
}
// newDynamicSleeper
func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper {
return &dynamicSleeper{
factor: factor,
cycle: make(chan struct{}),
maxSleep: maxWait,
minSleep: 100 * time.Microsecond,
}
}
// Timer returns a timer that has started.
// When the returned function is called it will wait.
func (d *dynamicSleeper) Timer(ctx context.Context) func() {
t := time.Now()
return func() {
doneAt := time.Now()
for {
// Grab current values
d.mu.RLock()
minWait, maxWait := d.minSleep, d.maxSleep
factor := d.factor
cycle := d.cycle
d.mu.RUnlock()
elapsed := doneAt.Sub(t)
// Don't sleep for really small amount of time
wantSleep := time.Duration(float64(elapsed) * factor)
if wantSleep <= minWait {
return
}
if maxWait > 0 && wantSleep > maxWait {
wantSleep = maxWait
}
timer := time.NewTimer(wantSleep)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
return
case <-cycle:
if !timer.Stop() {
// We expired.
<-timer.C
return
}
}
}
}
}
// Sleep sleeps the specified time multiplied by the sleep factor.
// If the factor is updated the sleep will be done again with the new factor.
func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) {
for {
// Grab current values
d.mu.RLock()
minWait, maxWait := d.minSleep, d.maxSleep
factor := d.factor
cycle := d.cycle
d.mu.RUnlock()
// Don't sleep for really small amount of time
wantSleep := time.Duration(float64(base) * factor)
if wantSleep <= minWait {
return
}
if maxWait > 0 && wantSleep > maxWait {
wantSleep = maxWait
}
timer := time.NewTimer(wantSleep)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
return
case <-cycle:
if !timer.Stop() {
// We expired.
<-timer.C
return
}
}
}
}
// Update the current settings and cycle all waiting.
// Parameters are the same as in the contructor.
func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
d.mu.Lock()
defer d.mu.Unlock()
if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait {
return nil
}
// Update values and cycle waiting.
close(d.cycle)
d.factor = factor
d.maxSleep = maxWait
d.cycle = make(chan struct{})
return nil
}

@ -28,7 +28,6 @@ import (
const (
envDataUsageCrawlConf = "MINIO_DISK_USAGE_CRAWL_ENABLE"
envDataUsageCrawlDelay = "MINIO_DISK_USAGE_CRAWL_DELAY"
envDataUsageCrawlDebug = "MINIO_DISK_USAGE_CRAWL_DEBUG"
dataUsageRoot = SlashSeparator

@ -20,6 +20,7 @@ import (
"crypto/x509"
"net/http"
"os"
"sync"
"time"
"github.com/minio/minio-go/v7/pkg/set"
@ -245,7 +246,8 @@ var (
globalAutoEncryption bool
// Is compression enabled?
globalCompressConfig compress.Config
globalCompressConfigMu sync.Mutex
globalCompressConfig compress.Config
// Some standard object extensions which we strictly dis-allow for compression.
standardExcludeCompressExtensions = []string{".gz", ".bz2", ".rar", ".zip", ".7z", ".xz", ".mp4", ".mkv", ".mov"}

@ -35,6 +35,7 @@ type apiConfig struct {
listQuorum int
extendListLife time.Duration
corsAllowOrigins []string
setDriveCount int
}
func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
@ -43,6 +44,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
t.clusterDeadline = cfg.ClusterDeadline
t.corsAllowOrigins = cfg.CorsAllowOrigin
t.setDriveCount = setDriveCount
var apiRequestsMaxPerNode int
if cfg.RequestsMax <= 0 {
@ -62,8 +64,14 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
apiRequestsMaxPerNode /= len(globalEndpoints.Hostnames())
}
}
t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode)
if cap(t.requestsPool) < apiRequestsMaxPerNode {
// Only replace if needed.
// Existing requests will use the previous limit,
// but new requests will use the new limit.
// There will be a short overlap window,
// but this shouldn't last long.
t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode)
}
t.requestsDeadline = cfg.RequestsDeadline
t.listQuorum = cfg.GetListQuorum()
t.extendListLife = cfg.ExtendListLife
@ -76,6 +84,13 @@ func (t *apiConfig) getListQuorum() int {
return t.listQuorum
}
func (t *apiConfig) getSetDriveCount() int {
t.mu.RLock()
defer t.mu.RUnlock()
return t.setDriveCount
}
func (t *apiConfig) getExtendListLife() time.Duration {
t.mu.RLock()
defer t.mu.RUnlock()

@ -436,7 +436,10 @@ func (o ObjectInfo) GetActualSize() (int64, error) {
// Using compression and encryption together enables room for side channel attacks.
// Eliminate non-compressible objects by extensions/content-types.
func isCompressible(header http.Header, object string) bool {
if crypto.IsRequested(header) || excludeForCompression(header, object, globalCompressConfig) {
globalCompressConfigMu.Lock()
cfg := globalCompressConfig
globalCompressConfigMu.Unlock()
if !cfg.Enabled || crypto.IsRequested(header) || excludeForCompression(header, object, cfg) {
return false
}
return true

@ -786,6 +786,17 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req
globalServiceSignalCh <- signal
case serviceStop:
globalServiceSignalCh <- signal
case serviceReloadDynamic:
srvCfg, err := getValidConfig(newObjectLayerFn())
if err != nil {
s.writeErrorResponse(w, err)
return
}
err = applyDynamicConfig(r.Context(), srvCfg)
if err != nil {
s.writeErrorResponse(w, err)
}
return
default:
s.writeErrorResponse(w, errUnsupportedSignal)
return

@ -475,7 +475,7 @@ func serverMain(ctx *cli.Context) {
logger.SetDeploymentID(globalDeploymentID)
go initDataCrawler(GlobalContext, newObject)
initDataCrawler(GlobalContext, newObject)
// Enable background operations for erasure coding
if globalIsErasure {

@ -27,8 +27,9 @@ import (
type serviceSignal int
const (
serviceRestart serviceSignal = iota // Restarts the server.
serviceStop // Stops the server.
serviceRestart serviceSignal = iota // Restarts the server.
serviceStop // Stops the server.
serviceReloadDynamic // Reload dynamic config values.
// Add new service requests here.
)

@ -336,7 +336,10 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
if objAPI == nil {
return cache, errServerNotInitialized
}
opts := globalHealConfig
globalHealConfigMu.Lock()
healOpts := globalHealConfig
globalHealConfigMu.Unlock()
dataUsageInfo, err := crawlDataFolder(ctx, s.diskPath, cache, func(item crawlItem) (int64, error) {
// Look for `xl.meta/xl.json' at the leaf.
@ -375,7 +378,7 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
})
if !version.Deleted {
// Bitrot check local data
if size > 0 && item.heal && opts.Bitrot {
if size > 0 && item.heal && healOpts.Bitrot {
// HealObject verifies bitrot requirement internally
res, err := objAPI.HealObject(ctx, item.bucket, item.objectPath(), oi.VersionID, madmin.HealOpts{
Remove: healDeleteDangling,

@ -1693,7 +1693,7 @@ func TestXLStorageVerifyFile(t *testing.T) {
reader := bytes.NewReader(data)
for {
// Using io.CopyBuffer instead of this loop will not work for us as io.CopyBuffer
// will use bytes.Buffer.ReadFrom() which will not do shardSize'ed writes causing error.
// will use bytes.Buffer.ReadConfig() which will not do shardSize'ed writes causing error.
n, err := reader.Read(shard)
w.Write(shard[:n])
if err == nil {

9
pkg/env/env.go vendored

@ -24,10 +24,19 @@ import (
var (
privateMutex sync.RWMutex
lockEnvMutex sync.Mutex
envOff bool
)
// LockSetEnv locks modifications to environment.
// Call returned function to unlock.
func LockSetEnv() func() {
lockEnvMutex.Lock()
return lockEnvMutex.Unlock
}
// SetEnvOff - turns off env lookup
// A global lock above this MUST ensure that
func SetEnvOff() {
privateMutex.Lock()
defer privateMutex.Unlock()

@ -50,11 +50,19 @@ func (adm *AdminClient) DelConfigKV(ctx context.Context, k string) (err error) {
return nil
}
const (
// ConfigAppliedHeader is the header indicating whether the config was applied without requiring a restart.
ConfigAppliedHeader = "x-minio-config-applied"
// ConfigAppliedTrue is the value set in header if the config was applied.
ConfigAppliedTrue = "true"
)
// SetConfigKV - set key value config to server.
func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (err error) {
func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (restart bool, err error) {
econfigBytes, err := EncryptData(adm.getSecretKey(), []byte(kv))
if err != nil {
return err
return false, err
}
reqData := requestData{
@ -67,14 +75,14 @@ func (adm *AdminClient) SetConfigKV(ctx context.Context, kv string) (err error)
defer closeResponse(resp)
if err != nil {
return err
return false, err
}
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp)
return false, httpRespToErrorResponse(resp)
}
return nil
return resp.Header.Get(ConfigAppliedHeader) != ConfigAppliedTrue, nil
}
// GetConfigKV - returns the key, value of the requested key, incoming data is encrypted.

Loading…
Cancel
Save