fix: IAM store fallback to list users and policies from disk (#10787)

Bonus fixes, remove package retry it is harder to get it
right, also manage context remove it such that we don't have
to rely on it anymore instead use a simple Jitter retry.
master
Harshavardhana 4 years ago committed by GitHub
parent 4ea31da889
commit 68de5a6f6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      cmd/erasure-healing-common.go
  2. 10
      cmd/erasure-server-sets.go
  3. 17
      cmd/format-fs.go
  4. 1
      cmd/iam-object-store.go
  5. 110
      cmd/iam.go
  6. 61
      cmd/server-main.go
  7. 133
      pkg/retry/retry.go
  8. 85
      pkg/retry/retry_test.go
  9. 18
      ruleguard.rules.go

@ -26,22 +26,25 @@ import (
// commonTime returns a maximally occurring time from a list of time. // commonTime returns a maximally occurring time from a list of time.
func commonTime(modTimes []time.Time) (modTime time.Time, count int) { func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
var maxima int // Counter for remembering max occurrence of elements. var maxima int // Counter for remembering max occurrence of elements.
timeOccurenceMap := make(map[time.Time]int) timeOccurenceMap := make(map[int64]int)
// Ignore the uuid sentinel and count the rest. // Ignore the uuid sentinel and count the rest.
for _, time := range modTimes { for _, time := range modTimes {
if time.Equal(timeSentinel) { if time.Equal(timeSentinel) {
continue continue
} }
timeOccurenceMap[time]++ timeOccurenceMap[time.UnixNano()]++
} }
// Find the common cardinality from previously collected // Find the common cardinality from previously collected
// occurrences of elements. // occurrences of elements.
for time, count := range timeOccurenceMap { for nano, count := range timeOccurenceMap {
if count > maxima || (count == maxima && time.After(modTime)) { t := time.Unix(0, nano)
if count > maxima || (count == maxima && t.After(modTime)) {
maxima = count maxima = count
modTime = time modTime = t
} }
} }
// Return the collected common uuid. // Return the collected common uuid.
return modTime, maxima return modTime, maxima
} }

@ -1202,15 +1202,7 @@ func (z *erasureServerSets) deleteAll(ctx context.Context, bucket, prefix string
wg.Add(1) wg.Add(1)
go func(disk StorageAPI) { go func(disk StorageAPI) {
defer wg.Done() defer wg.Done()
if err := disk.Delete(ctx, bucket, prefix, true); err != nil { disk.Delete(ctx, bucket, prefix, true)
if !IsErrIgnored(err, []error{
errDiskNotFound,
errVolumeNotFound,
errFileNotFound,
}...) {
logger.LogOnceIf(ctx, err, disk.String())
}
}
}(disk) }(disk)
} }
} }

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math/rand"
"os" "os"
"path" "path"
"time" "time"
@ -27,7 +28,6 @@ import (
"github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lock" "github.com/minio/minio/pkg/lock"
"github.com/minio/minio/pkg/retry"
) )
// FS format version strings. // FS format version strings.
@ -340,31 +340,28 @@ func formatFSFixDeploymentID(ctx context.Context, fsFormatPath string) error {
return time.Now().Round(time.Second).Sub(formatStartTime).String() return time.Now().Round(time.Second).Sub(formatStartTime).String()
} }
retryCtx, cancel := context.WithCancel(ctx) r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Indicate to our routine to exit cleanly upon return.
defer cancel()
var wlk *lock.LockedFile var wlk *lock.LockedFile
retryCh := retry.NewTimerWithJitter(retryCtx, time.Second, 30*time.Second, retry.MaxJitter)
var stop bool var stop bool
for !stop { for !stop {
select { select {
case <-retryCh: case <-ctx.Done():
return fmt.Errorf("Initializing FS format stopped gracefully")
default:
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0) wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked { if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again // Lock already present, sleep and attempt again
logger.Info("Another minio process(es) might be holding a lock to the file %s. Please kill that minio process(es) (elapsed %s)\n", fsFormatPath, getElapsedTime()) logger.Info("Another minio process(es) might be holding a lock to the file %s. Please kill that minio process(es) (elapsed %s)\n", fsFormatPath, getElapsedTime())
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue continue
} }
if err != nil { if err != nil {
return err return err
} }
stop = true
case <-ctx.Done():
return fmt.Errorf("Initializing FS format stopped gracefully")
} }
stop = true
} }
defer wlk.Close() defer wlk.Close()
if err = jsonLoad(wlk, format); err != nil { if err = jsonLoad(wlk, format); err != nil {

@ -421,7 +421,6 @@ func (iamOS *IAMObjectStore) loadAll(ctx context.Context, sys *IAMSys) error {
} }
// Sets default canned policies, if none are set. // Sets default canned policies, if none are set.
setDefaultCannedPolicies(sys.iamPolicyDocsMap) setDefaultCannedPolicies(sys.iamPolicyDocsMap)
iamOS.unlock() iamOS.unlock()
if isMinIOUsersSys { if isMinIOUsersSys {

@ -23,6 +23,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"strings" "strings"
"time" "time"
@ -33,7 +34,6 @@ import (
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
iampolicy "github.com/minio/minio/pkg/iam/policy" iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/retry"
) )
// UsersSysType - defines the type of users and groups system that is // UsersSysType - defines the type of users and groups system that is
@ -443,11 +443,15 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
// allocate dynamic timeout once before the loop // allocate dynamic timeout once before the loop
iamLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second) iamLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
for range retry.NewTimerWithJitter(retryCtx, time.Second, 5*time.Second, retry.MaxJitter) { r := rand.New(rand.NewSource(time.Now().UnixNano()))
var err error
for {
// let one of the server acquire the lock, if not let them timeout. // let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop. // which shall be retried again by this loop.
if err := txnLk.GetLock(iamLockTimeout); err != nil { if err = txnLk.GetLock(iamLockTimeout); err != nil {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock") logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. trying to acquire lock")
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue continue
} }
@ -455,7 +459,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
// **** WARNING **** // **** WARNING ****
// Migrating to encrypted backend on etcd should happen before initialization of // Migrating to encrypted backend on etcd should happen before initialization of
// IAM sub-system, make sure that we do not move the above codeblock elsewhere. // IAM sub-system, make sure that we do not move the above codeblock elsewhere.
if err := migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil { if err = migrateIAMConfigsEtcdToEncrypted(ctx, globalEtcdClient); err != nil {
txnLk.Unlock() txnLk.Unlock()
logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to decrypt an encrypted ETCD backend for IAM users and policies: %w", err))
logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available")) logger.LogIf(ctx, errors.New("IAM sub-system is partially initialized, some users may not be available"))
@ -469,11 +473,10 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
} }
// Migrate IAM configuration, if necessary. // Migrate IAM configuration, if necessary.
if err := sys.doIAMConfigMigration(ctx); err != nil { if err = sys.doIAMConfigMigration(ctx); err != nil {
txnLk.Unlock() txnLk.Unlock()
if errors.Is(err, errDiskNotFound) || if errors.Is(err, errDiskNotFound) ||
errors.Is(err, errConfigNotFound) || errors.Is(err, errConfigNotFound) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.DeadlineExceeded) ||
errors.As(err, &rquorum) || errors.As(err, &rquorum) ||
errors.As(err, &wquorum) || errors.As(err, &wquorum) ||
@ -491,11 +494,10 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
break break
} }
err := sys.store.loadAll(ctx, sys) err = sys.store.loadAll(ctx, sys)
// Invalidate the old cred always, even upon error to avoid any leakage. // Invalidate the old cred always, even upon error to avoid any leakage.
globalOldCred = auth.Credentials{} globalOldCred = auth.Credentials{}
if err != nil { if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize IAM sub-system, some users may not be available %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to initialize IAM sub-system, some users may not be available %w", err))
} }
@ -579,12 +581,18 @@ func (sys *IAMSys) ListPolicies() (map[string]iampolicy.Policy, error) {
} }
sys.store.rlock() sys.store.rlock()
defer sys.store.runlock() fallback := sys.storeFallback
sys.store.runlock()
if sys.storeFallback { if fallback {
return nil, errIAMNotInitialized if err := sys.store.loadAll(context.Background(), sys); err != nil {
return nil, err
}
} }
sys.store.rlock()
defer sys.store.runlock()
policyDocsMap := make(map[string]iampolicy.Policy, len(sys.iamPolicyDocsMap)) policyDocsMap := make(map[string]iampolicy.Policy, len(sys.iamPolicyDocsMap))
for k, v := range sys.iamPolicyDocsMap { for k, v := range sys.iamPolicyDocsMap {
policyDocsMap[k] = v policyDocsMap[k] = v
@ -741,19 +749,25 @@ func (sys *IAMSys) ListUsers() (map[string]madmin.UserInfo, error) {
return nil, errServerNotInitialized return nil, errServerNotInitialized
} }
var users = make(map[string]madmin.UserInfo)
sys.store.rlock()
defer sys.store.runlock()
if sys.usersSysType != MinIOUsersSysType { if sys.usersSysType != MinIOUsersSysType {
return nil, errIAMActionNotAllowed return nil, errIAMActionNotAllowed
} }
if sys.storeFallback { sys.store.rlock()
return nil, errIAMNotInitialized fallback := sys.storeFallback
sys.store.runlock()
if fallback {
if err := sys.store.loadAll(context.Background(), sys); err != nil {
return nil, err
}
} }
sys.store.rlock()
defer sys.store.runlock()
var users = make(map[string]madmin.UserInfo)
for k, v := range sys.iamUsersMap { for k, v := range sys.iamUsersMap {
if !v.IsTemp() && !v.IsServiceAccount() { if !v.IsTemp() && !v.IsServiceAccount() {
users[k] = madmin.UserInfo{ users[k] = madmin.UserInfo{
@ -975,14 +989,19 @@ func (sys *IAMSys) ListServiceAccounts(ctx context.Context, accessKey string) ([
} }
sys.store.rlock() sys.store.rlock()
defer sys.store.runlock() fallback := sys.storeFallback
sys.store.runlock()
if sys.storeFallback { if fallback {
return nil, errIAMNotInitialized if err := sys.store.loadAll(context.Background(), sys); err != nil {
return nil, err
}
} }
var serviceAccounts []string sys.store.rlock()
defer sys.store.runlock()
var serviceAccounts []string
for k, v := range sys.iamUsersMap { for k, v := range sys.iamUsersMap {
if v.IsServiceAccount() && v.ParentUser == accessKey { if v.IsServiceAccount() && v.ParentUser == accessKey {
serviceAccounts = append(serviceAccounts, k) serviceAccounts = append(serviceAccounts, k)
@ -1083,13 +1102,13 @@ func (sys *IAMSys) SetUserSecretKey(accessKey string, secretKey string) error {
return errServerNotInitialized return errServerNotInitialized
} }
sys.store.lock()
defer sys.store.unlock()
if sys.usersSysType != MinIOUsersSysType { if sys.usersSysType != MinIOUsersSysType {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
sys.store.lock()
defer sys.store.unlock()
cred, ok := sys.iamUsersMap[accessKey] cred, ok := sys.iamUsersMap[accessKey]
if !ok { if !ok {
return errNoSuchUser return errNoSuchUser
@ -1181,13 +1200,13 @@ func (sys *IAMSys) AddUsersToGroup(group string, members []string) error {
return errInvalidArgument return errInvalidArgument
} }
sys.store.lock()
defer sys.store.unlock()
if sys.usersSysType != MinIOUsersSysType { if sys.usersSysType != MinIOUsersSysType {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
sys.store.lock()
defer sys.store.unlock()
// Validate that all members exist. // Validate that all members exist.
for _, member := range members { for _, member := range members {
cr, ok := sys.iamUsersMap[member] cr, ok := sys.iamUsersMap[member]
@ -1241,13 +1260,13 @@ func (sys *IAMSys) RemoveUsersFromGroup(group string, members []string) error {
return errInvalidArgument return errInvalidArgument
} }
sys.store.lock()
defer sys.store.unlock()
if sys.usersSysType != MinIOUsersSysType { if sys.usersSysType != MinIOUsersSysType {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
sys.store.lock()
defer sys.store.unlock()
// Validate that all members exist. // Validate that all members exist.
for _, member := range members { for _, member := range members {
cr, ok := sys.iamUsersMap[member] cr, ok := sys.iamUsersMap[member]
@ -1317,13 +1336,13 @@ func (sys *IAMSys) SetGroupStatus(group string, enabled bool) error {
return errServerNotInitialized return errServerNotInitialized
} }
sys.store.lock()
defer sys.store.unlock()
if sys.usersSysType != MinIOUsersSysType { if sys.usersSysType != MinIOUsersSysType {
return errIAMActionNotAllowed return errIAMActionNotAllowed
} }
sys.store.lock()
defer sys.store.unlock()
if group == "" { if group == "" {
return errInvalidArgument return errInvalidArgument
} }
@ -1392,20 +1411,28 @@ func (sys *IAMSys) ListGroups() (r []string, err error) {
return r, errServerNotInitialized return r, errServerNotInitialized
} }
if sys.usersSysType != MinIOUsersSysType {
return nil, errIAMActionNotAllowed
}
sys.store.rlock() sys.store.rlock()
defer sys.store.runlock() fallback := sys.storeFallback
sys.store.runlock()
if sys.storeFallback { if fallback {
return nil, errIAMNotInitialized if err := sys.store.loadAll(context.Background(), sys); err != nil {
return nil, err
} }
if sys.usersSysType != MinIOUsersSysType {
return nil, errIAMActionNotAllowed
} }
sys.store.rlock()
defer sys.store.runlock()
r = make([]string, 0, len(sys.iamGroupsMap))
for k := range sys.iamGroupsMap { for k := range sys.iamGroupsMap {
r = append(r, k) r = append(r, k)
} }
return r, nil return r, nil
} }
@ -1909,9 +1936,6 @@ func (sys *IAMSys) removeGroupFromMembershipsMap(group string) {
// EnableLDAPSys - enable ldap system users type. // EnableLDAPSys - enable ldap system users type.
func (sys *IAMSys) EnableLDAPSys() { func (sys *IAMSys) EnableLDAPSys() {
sys.store.lock()
defer sys.store.unlock()
sys.usersSysType = LDAPUsersSysType sys.usersSysType = LDAPUsersSysType
} }

@ -21,6 +21,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"net" "net"
"os" "os"
"os/signal" "os/signal"
@ -38,7 +39,6 @@ import (
"github.com/minio/minio/pkg/certs" "github.com/minio/minio/pkg/certs"
"github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/retry"
) )
// ServerFlags - server command specific flags // ServerFlags - server command specific flags
@ -209,21 +209,12 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
// Initialize IAM store // Initialize IAM store
globalIAMSys.InitStore(newObject) globalIAMSys.InitStore(newObject)
// Create cancel context to control 'newRetryTimer' go routine.
retryCtx, cancel := context.WithCancel(ctx)
// Indicate to our routine to exit cleanly upon return.
defer cancel()
// Make sure to hold lock for entire migration to avoid // Make sure to hold lock for entire migration to avoid
// such that only one server should migrate the entire config // such that only one server should migrate the entire config
// at a given time, this big transaction lock ensures this // at a given time, this big transaction lock ensures this
// appropriately. This is also true for rotation of encrypted // appropriately. This is also true for rotation of encrypted
// content. // content.
txnLk := newObject.NewNSLock(retryCtx, minioMetaBucket, minioConfigPrefix+"/transaction.lock") txnLk := newObject.NewNSLock(ctx, minioMetaBucket, minioConfigPrefix+"/transaction.lock")
// allocate dynamic timeout once before the loop
configLockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
// **** WARNING **** // **** WARNING ****
// Migrating to encrypted backend should happen before initialization of any // Migrating to encrypted backend should happen before initialization of any
@ -238,12 +229,25 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
rquorum := InsufficientReadQuorum{} rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{} wquorum := InsufficientWriteQuorum{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
lockTimeout := newDynamicTimeout(5*time.Second, 3*time.Second)
var err error var err error
for range retry.NewTimerWithJitter(retryCtx, 500*time.Millisecond, time.Second, retry.MaxJitter) { for {
select {
case <-ctx.Done():
// Retry was canceled successfully.
return fmt.Errorf("Initializing sub-systems stopped gracefully %w", ctx.Err())
default:
}
// let one of the server acquire the lock, if not let them timeout. // let one of the server acquire the lock, if not let them timeout.
// which shall be retried again by this loop. // which shall be retried again by this loop.
if err = txnLk.GetLock(configLockTimeout); err != nil { if err = txnLk.GetLock(lockTimeout); err != nil {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock") logger.Info("Waiting for all MinIO sub-systems to be initialized.. trying to acquire lock")
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue continue
} }
@ -279,20 +283,13 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
errors.As(err, &wquorum) || errors.As(err, &wquorum) ||
isErrBucketNotFound(err) { isErrBucketNotFound(err) {
logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err) logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err)
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
continue continue
} }
// Any other unhandled return right here. // Any other unhandled return right here.
return fmt.Errorf("Unable to initialize sub-systems: %w", err) return fmt.Errorf("Unable to initialize sub-systems: %w", err)
} }
// Return an error when retry is canceled or deadlined
if err = retryCtx.Err(); err != nil {
return err
}
// Retry was canceled successfully.
return errors.New("Initializing sub-systems stopped gracefully")
} }
func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
@ -351,17 +348,6 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
go initFederatorBackend(buckets, newObject) go initFederatorBackend(buckets, newObject)
} }
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(ctx, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
globalCacheObjectAPI = cacheAPI
globalObjLayerMutex.Unlock()
}
// Initialize bucket metadata sub-system. // Initialize bucket metadata sub-system.
globalBucketMetadataSys.Init(ctx, buckets, newObject) globalBucketMetadataSys.Init(ctx, buckets, newObject)
@ -509,6 +495,17 @@ func serverMain(ctx *cli.Context) {
} }
} }
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
globalCacheObjectAPI = cacheAPI
globalObjLayerMutex.Unlock()
}
// Initialize users credentials and policies in background right after config has initialized. // Initialize users credentials and policies in background right after config has initialized.
go globalIAMSys.Init(GlobalContext, newObject) go globalIAMSys.Init(GlobalContext, newObject)

@ -1,133 +0,0 @@
/*
* Minio Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retry
import (
"context"
"math"
"math/rand"
"time"
)
// MaxJitter will randomize over the full exponential backoff time
const MaxJitter = 1.0
// NoJitter disables the use of jitter for randomizing the
// exponential backoff time
const NoJitter = 0.0
// defaultTimer implements Timer interface using time.Timer
type defaultTimer struct {
timer *time.Timer
}
// C returns the timers channel which receives the current time when the timer fires.
func (t *defaultTimer) C() <-chan time.Time {
return t.timer.C
}
// Start starts the timer to fire after the given duration
// don't use this code concurrently.
func (t *defaultTimer) Start(duration time.Duration) {
if t.timer == nil {
t.timer = time.NewTimer(duration)
} else {
t.timer.Reset(duration)
}
}
// Stop is called when the timer is not used anymore and resources may be freed.
func (t *defaultTimer) Stop() {
if t.timer != nil {
t.timer.Stop()
}
}
// NewTimerWithJitter creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a fully
// configurable version, meant for only advanced use cases. For the most part
// one should use newRetryTimerSimple and newRetryTimer.
func NewTimerWithJitter(ctx context.Context, unit time.Duration, cap time.Duration, jitter float64) <-chan int {
attemptCh := make(chan int)
// normalize jitter to the range [0, 1.0]
jitter = math.Max(NoJitter, math.Min(MaxJitter, jitter))
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// 1<<uint(attempt) below could overflow, so limit the value of attempt
const maxAttempt = 30
if attempt > maxAttempt {
attempt = maxAttempt
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * 1 << uint(attempt)
if sleep > cap {
sleep = cap
}
if jitter > NoJitter {
sleep -= time.Duration(rand.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
nextBackoff := 0
t := &defaultTimer{}
defer func() {
t.Stop()
}()
defer close(attemptCh)
// Channel used to signal after the expiry of backoff wait seconds.
for {
select {
case <-ctx.Done():
return
case attemptCh <- nextBackoff:
nextBackoff++
}
t.Start(exponentialBackoffWait(nextBackoff))
select {
case <-ctx.Done():
return
case <-t.C():
}
}
}()
// Start reading..
return attemptCh
}
// Default retry constants.
const (
defaultRetryUnit = 50 * time.Millisecond // 50 millisecond.
defaultRetryCap = 500 * time.Millisecond // 500 millisecond.
)
// NewTimer creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached. - this function is a
// simpler version with all default values.
func NewTimer(ctx context.Context) <-chan int {
return NewTimerWithJitter(ctx, defaultRetryUnit, defaultRetryCap, MaxJitter)
}

@ -1,85 +0,0 @@
/*
* Minio Cloud Storage, (C) 2020 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retry
import (
"context"
"testing"
"time"
)
// Tests for retry timer.
func TestRetryTimerSimple(t *testing.T) {
retryCtx, cancel := context.WithCancel(context.Background())
attemptCh := NewTimer(retryCtx)
i := <-attemptCh
if i != 0 {
cancel()
t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i)
}
i = <-attemptCh
if i <= 0 {
cancel()
t.Fatalf("Invalid attempt counter returned should be greater than 0, found %d instead", i)
}
cancel()
_, ok := <-attemptCh
if ok {
t.Fatal("Attempt counter should be closed")
}
}
// Test retry time with no jitter.
func TestRetryTimerWithNoJitter(t *testing.T) {
retryCtx, cancel := context.WithCancel(context.Background())
defer cancel()
// No jitter
attemptCh := NewTimerWithJitter(retryCtx, time.Millisecond, 5*time.Millisecond, NoJitter)
i := <-attemptCh
if i != 0 {
cancel()
t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i)
}
// Loop through the maximum possible attempt.
for i = range attemptCh {
if i == 30 {
cancel()
}
}
_, ok := <-attemptCh
if ok {
t.Fatal("Attempt counter should be closed")
}
}
// Test retry time with Jitter greater than MaxJitter.
func TestRetryTimerWithJitter(t *testing.T) {
retryCtx, cancel := context.WithCancel(context.Background())
// Jitter will be set back to 1.0
attemptCh := NewTimerWithJitter(retryCtx, time.Second, 30*time.Second, 2.0)
i := <-attemptCh
if i != 0 {
cancel()
t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i)
}
cancel()
_, ok := <-attemptCh
if ok {
t.Fatal("Attempt counter should be closed")
}
}

@ -35,6 +35,7 @@ func unconvert(m fluent.Matcher) {
func timeeq(m fluent.Matcher) { func timeeq(m fluent.Matcher) {
m.Match("$t0 == $t1").Where(m["t0"].Type.Is("time.Time")).Report("using == with time.Time") m.Match("$t0 == $t1").Where(m["t0"].Type.Is("time.Time")).Report("using == with time.Time")
m.Match("$t0 != $t1").Where(m["t0"].Type.Is("time.Time")).Report("using != with time.Time") m.Match("$t0 != $t1").Where(m["t0"].Type.Is("time.Time")).Report("using != with time.Time")
m.Match(`map[$k]$v`).Where(m["k"].Type.Is("time.Time")).Report("map with time.Time keys are easy to misuse")
} }
// Wrong err in error check // Wrong err in error check
@ -446,3 +447,20 @@ func hmacnew(m fluent.Matcher) {
).Where(m["x"].Pure). ).Where(m["x"].Pure).
Report("invalid hash passed to hmac.New()") Report("invalid hash passed to hmac.New()")
} }
func writestring(m fluent.Matcher) {
m.Match(`io.WriteString($w, string($b))`).
Where(m["b"].Type.Is("[]byte")).
Suggest("$w.Write($b)")
}
func badlock(m fluent.Matcher) {
// Shouldn't give many false positives without type filter
// as Lock+Unlock pairs in combination with defer gives us pretty
// a good chance to guess correctly. If we constrain the type to sync.Mutex
// then it'll be harder to match embedded locks and custom methods
// that may forward the call to the sync.Mutex (or other synchronization primitive).
m.Match(`$mu.Lock(); defer $mu.RUnlock()`).Report(`maybe $mu.RLock() was intended?`)
m.Match(`$mu.RLock(); defer $mu.Unlock()`).Report(`maybe $mu.Lock() was intended?`)
}

Loading…
Cancel
Save