From f1d77801678b1e2f8d9b237d313b069d1c5e04bc Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 19 Apr 2017 14:22:35 -0700 Subject: [PATCH] lock: Vendorize all the new changes made in minio/dsync (#4154) Fixes #4139 --- vendor/github.com/minio/dsync/README.md | 6 +- vendor/github.com/minio/dsync/drwmutex.go | 156 +++++++++------------- vendor/github.com/minio/dsync/retry.go | 142 ++++++++++++++++++++ vendor/vendor.json | 6 +- 4 files changed, 208 insertions(+), 102 deletions(-) create mode 100644 vendor/github.com/minio/dsync/retry.go diff --git a/vendor/github.com/minio/dsync/README.md b/vendor/github.com/minio/dsync/README.md index a95f5debe..6bc3a8fdd 100644 --- a/vendor/github.com/minio/dsync/README.md +++ b/vendor/github.com/minio/dsync/README.md @@ -1,4 +1,4 @@ -dsync +dsync [![Slack](https://slack.minio.io/slack?type=svg)](https://slack.minio.io) [![Go Report Card](https://goreportcard.com/badge/minio/minio)](https://goreportcard.com/report/minio/minio) [![codecov](https://codecov.io/gh/minio/dsync/branch/master/graph/badge.svg)](https://codecov.io/gh/minio/dsync) ===== A distributed locking and syncing package for Go. @@ -16,7 +16,7 @@ This package was developed for the distributed server version of [Minio Object S For [minio](https://minio.io/) the distributed version is started as follows (for a 6-server system): ``` -$ minio server server1:/disk server2:/disk server3:/disk server4:/disk server5:/disk server6:/disk +$ minio server http://server1/disk http://server2/disk http://server3/disk http://server4/disk http://server5/disk http://server6/disk ``` _(note that the same identical command should be run on servers `server1` through to `server6`)_ @@ -57,7 +57,7 @@ This table shows test performance on the same (EC2) instance type but with a var | c3.2xlarge | 12 | (min=1239, max=1558) | 16782 | 25% | | c3.2xlarge | 16 | (min=996, max=1391) | 19096 | 25% | -The mix and max locks/server/sec gradually declines but due to the larger number of nodes the overall total number of locks rises steadily (at the same CPU usage level). +The min and max locks/server/sec gradually declines but due to the larger number of nodes the overall total number of locks rises steadily (at the same CPU usage level). ### Performance with difference instance types diff --git a/vendor/github.com/minio/dsync/drwmutex.go b/vendor/github.com/minio/dsync/drwmutex.go index b15bd4fb8..fa336ed9f 100644 --- a/vendor/github.com/minio/dsync/drwmutex.go +++ b/vendor/github.com/minio/dsync/drwmutex.go @@ -20,9 +20,6 @@ import ( cryptorand "crypto/rand" "fmt" golog "log" - "math" - "math/rand" - "net" "os" "sync" "time" @@ -43,7 +40,7 @@ func log(msg ...interface{}) { } // DRWMutexAcquireTimeout - tolerance limit to wait for lock acquisition before. -const DRWMutexAcquireTimeout = 25 * time.Millisecond // 25ms. +const DRWMutexAcquireTimeout = 1 * time.Second // 1 second. // A DRWMutex is a distributed mutual exclusion lock. type DRWMutex struct { @@ -53,19 +50,21 @@ type DRWMutex struct { m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node } +// Granted - represents a structure of a granted lock. type Granted struct { index int - lockUid string // Locked if set with UID string, unlocked if empty + lockUID string // Locked if set with UID string, unlocked if empty } func (g *Granted) isLocked() bool { - return isLocked(g.lockUid) + return isLocked(g.lockUID) } func isLocked(uid string) bool { return len(uid) > 0 } +// NewDRWMutex - initializes a new dsync RW mutex. func NewDRWMutex(name string) *DRWMutex { return &DRWMutex{ Name: name, @@ -98,22 +97,24 @@ func (dm *DRWMutex) RLock() { // The call will block until the lock is granted using a built-in // timing randomized back-off algorithm to try again until successful func (dm *DRWMutex) lockBlocking(isReadLock bool) { + doneCh := make(chan struct{}) + defer close(doneCh) - runs, backOff := 1, 1 - - for { - // create temp array on stack + // We timed out on the previous lock, incrementally wait + // for a longer back-off time and try again afterwards. + for range newRetryTimerSimple(doneCh) { + // Create temp array on stack. locks := make([]string, dnodeCount) - // try to acquire the lock + // Try to acquire the lock. success := lock(clnts, &locks, dm.Name, isReadLock) if success { dm.m.Lock() defer dm.m.Unlock() - // if success, copy array to object + // If success, copy array to object if isReadLock { - // append new array of strings at the end + // Append new array of strings at the end dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount)) // and copy stack array into last spot copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:]) @@ -123,38 +124,31 @@ func (dm *DRWMutex) lockBlocking(isReadLock bool) { return } - - // We timed out on the previous lock, incrementally wait for a longer back-off time, - // and try again afterwards - time.Sleep(time.Duration(backOff) * time.Millisecond) - - backOff += int(rand.Float64() * math.Pow(2, float64(runs))) - if backOff > 1024 { - backOff = backOff % 64 - - runs = 1 // reset runs - } else if runs < 10 { - runs++ - } + // We timed out on the previous lock, incrementally wait + // for a longer back-off time and try again afterwards. } } -// lock tries to acquire the distributed lock, returning true or false -// +// lock tries to acquire the distributed lock, returning true or false. func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) bool { // Create buffered channel of size equal to total number of nodes. ch := make(chan Granted, dnodeCount) + defer close(ch) + var wg sync.WaitGroup for index, c := range clnts { + wg.Add(1) // broadcast lock request to all nodes go func(index int, isReadLock bool, c NetLocker) { + defer wg.Done() + // All client methods issuing RPCs are thread-safe and goroutine-safe, // i.e. it is safe to call them from multiple concurrently running go routines. - bytesUid := [16]byte{} - cryptorand.Read(bytesUid[:]) - uid := fmt.Sprintf("%X", bytesUid[:]) + bytesUID := [16]byte{} + cryptorand.Read(bytesUID[:]) + uid := fmt.Sprintf("%X", bytesUID[:]) args := LockArgs{ UID: uid, @@ -177,8 +171,9 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) g := Granted{index: index} if locked { - g.lockUid = args.UID + g.lockUID = args.UID } + ch <- g }(index, isReadLock, c) @@ -186,11 +181,15 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) quorum := false - var wg sync.WaitGroup wg.Add(1) go func(isReadLock bool) { - // Wait until we have either a) received all lock responses, b) received too many 'non-'locks for quorum to be or c) time out + // Wait until we have either + // + // a) received all lock responses + // b) received too many 'non-'locks for quorum to be still possible + // c) time out + // i, locksFailed := 0, 0 done := false timeout := time.After(DRWMutexAcquireTimeout) @@ -201,20 +200,19 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) case grant := <-ch: if grant.isLocked() { // Mark that this node has acquired the lock - (*locks)[grant.index] = grant.lockUid + (*locks)[grant.index] = grant.lockUID } else { locksFailed++ if !isReadLock && locksFailed > dnodeCount-dquorum || isReadLock && locksFailed > dnodeCount-dquorumReads { - // We know that we are not going to get the lock anymore, so exit out - // and release any locks that did get acquired + // We know that we are not going to get the lock anymore, + // so exit out and release any locks that did get acquired done = true // Increment the number of grants received from the buffered channel. i++ releaseAll(clnts, locks, lockName, isReadLock) } } - case <-timeout: done = true // timeout happened, maybe one of the nodes is slow, count @@ -229,7 +227,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) } } - // Count locks in order to determine whterh we have quorum or not + // Count locks in order to determine whether we have quorum or not quorum = quorumMet(locks, isReadLock) // Signal that we have the quorum @@ -242,7 +240,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) grantToBeReleased := <-ch if grantToBeReleased.isLocked() { // release lock - sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUid, isReadLock) + sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock) } } }(isReadLock) @@ -269,11 +267,14 @@ func quorumMet(locks *[]string, isReadLock bool) bool { } } + var quorum bool if isReadLock { - return count >= dquorumReads + quorum = count >= dquorumReads } else { - return count >= dquorum + quorum = count >= dquorum } + + return quorum } // releaseAll releases all locks that are marked as locked @@ -360,7 +361,6 @@ func unlock(locks []string, name string, isReadLock bool) { // ForceUnlock will forcefully clear a write or read lock. func (dm *DRWMutex) ForceUnlock() { - { dm.m.Lock() defer dm.m.Unlock() @@ -379,61 +379,25 @@ func (dm *DRWMutex) ForceUnlock() { // sendRelease sends a release message to a node that previously granted a lock func sendRelease(c NetLocker, name, uid string, isReadLock bool) { - - backOffArray := []time.Duration{ - 30 * time.Second, // 30secs. - 1 * time.Minute, // 1min. - 3 * time.Minute, // 3min. - 10 * time.Minute, // 10min. - 30 * time.Minute, // 30min. - 1 * time.Hour, // 1hr. + args := LockArgs{ + UID: uid, + Resource: name, + ServerAddr: clnts[ownNode].ServerAddr(), + ServiceEndpoint: clnts[ownNode].ServiceEndpoint(), } - - go func(c NetLocker, name string) { - - for _, backOff := range backOffArray { - - // All client methods issuing RPCs are thread-safe and goroutine-safe, - // i.e. it is safe to call them from multiple concurrently running goroutines. - args := LockArgs{ - UID: uid, - Resource: name, - ServerAddr: clnts[ownNode].ServerAddr(), - ServiceEndpoint: clnts[ownNode].ServiceEndpoint(), - } - - var err error - if len(uid) == 0 { - if _, err = c.ForceUnlock(args); err != nil { - log("Unable to call ForceUnlock", err) - } - } else if isReadLock { - if _, err = c.RUnlock(args); err != nil { - log("Unable to call RUnlock", err) - } - } else { - if _, err = c.Unlock(args); err != nil { - log("Unable to call Unlock", err) - } - } - - if err != nil { - // Ignore if err is net.Error and it is occurred due to timeout. - // The cause could have been server timestamp mismatch or server may have restarted. - // FIXME: This is minio specific behaviour and we would need a way to make it generically. - if nErr, ok := err.(net.Error); ok && nErr.Timeout() { - err = nil - } - } - - if err == nil { - return - } - - // Wait.. - time.Sleep(backOff) + if len(uid) == 0 { + if _, err := c.ForceUnlock(args); err != nil { + log("Unable to call ForceUnlock", err) + } + } else if isReadLock { + if _, err := c.RUnlock(args); err != nil { + log("Unable to call RUnlock", err) } - }(c, name) + } else { + if _, err := c.Unlock(args); err != nil { + log("Unable to call Unlock", err) + } + } } // DRLocker returns a sync.Locker interface that implements diff --git a/vendor/github.com/minio/dsync/retry.go b/vendor/github.com/minio/dsync/retry.go new file mode 100644 index 000000000..21a6002d0 --- /dev/null +++ b/vendor/github.com/minio/dsync/retry.go @@ -0,0 +1,142 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dsync + +import ( + "math/rand" + "sync" + "time" +) + +// lockedRandSource provides protected rand source, implements rand.Source interface. +type lockedRandSource struct { + lk sync.Mutex + src rand.Source +} + +// Int63 returns a non-negative pseudo-random 63-bit integer as an +// int64. +func (r *lockedRandSource) Int63() (n int64) { + r.lk.Lock() + n = r.src.Int63() + r.lk.Unlock() + return +} + +// Seed uses the provided seed value to initialize the generator to a +// deterministic state. +func (r *lockedRandSource) Seed(seed int64) { + r.lk.Lock() + r.src.Seed(seed) + r.lk.Unlock() +} + +// MaxJitter will randomize over the full exponential backoff time +const MaxJitter = 1.0 + +// NoJitter disables the use of jitter for randomizing the +// exponential backoff time +const NoJitter = 0.0 + +// Global random source for fetching random values. +var globalRandomSource = rand.New(&lockedRandSource{ + src: rand.NewSource(time.Now().UTC().UnixNano()), +}) + +// newRetryTimerJitter creates a timer with exponentially increasing delays +// until the maximum retry attempts are reached. - this function is a fully +// configurable version, meant for only advanced use cases. For the most part +// one should use newRetryTimerSimple and newRetryTimer. +func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { + attemptCh := make(chan int) + + // normalize jitter to the range [0, 1.0] + if jitter < NoJitter { + jitter = NoJitter + } + if jitter > MaxJitter { + jitter = MaxJitter + } + + // computes the exponential backoff duration according to + // https://www.awsarchitectureblog.com/2015/03/backoff.html + exponentialBackoffWait := func(attempt int) time.Duration { + // 1< maxAttempt { + attempt = maxAttempt + } + //sleep = random_between(0, min(cap, base * 2 ** attempt)) + sleep := unit * time.Duration(1< cap { + sleep = cap + } + if jitter != NoJitter { + sleep -= time.Duration(globalRandomSource.Float64() * float64(sleep) * jitter) + } + return sleep + } + + go func() { + defer close(attemptCh) + nextBackoff := 0 + // Channel used to signal after the expiry of backoff wait seconds. + var timer *time.Timer + for { + select { // Attempts starts. + case attemptCh <- nextBackoff: + nextBackoff++ + case <-doneCh: + // Stop the routine. + return + } + timer = time.NewTimer(exponentialBackoffWait(nextBackoff)) + // wait till next backoff time or till doneCh gets a message. + select { + case <-timer.C: + case <-doneCh: + // stop the timer and return. + timer.Stop() + return + } + + } + }() + + // Start reading.. + return attemptCh +} + +// Default retry constants. +const ( + defaultRetryUnit = time.Second // 1 second. + defaultRetryCap = 1 * time.Second // 1 second. +) + +// newRetryTimer creates a timer with exponentially increasing delays +// until the maximum retry attempts are reached. - this function provides +// resulting retry values to be of maximum jitter. +func newRetryTimer(unit time.Duration, cap time.Duration, doneCh chan struct{}) <-chan int { + return newRetryTimerWithJitter(unit, cap, MaxJitter, doneCh) +} + +// newRetryTimerSimple creates a timer with exponentially increasing delays +// until the maximum retry attempts are reached. - this function is a +// simpler version with all default values. +func newRetryTimerSimple(doneCh chan struct{}) <-chan int { + return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 6e63a242b..6bb3a36a9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -193,10 +193,10 @@ "revisionTime": "2017-02-27T07:32:28Z" }, { - "checksumSHA1": "NBGyq2+iTtJvJ+ElG4FzHLe1WSY=", + "checksumSHA1": "vrIbl0L+RLwyPRCxMss5+eZtADE=", "path": "github.com/minio/dsync", - "revision": "9cafd4d729eb71b31ef7851a8c8f6ceb855d0915", - "revisionTime": "2016-12-23T07:07:24Z" + "revision": "535db94aebce49cacce4de9c6f5f5821601281cd", + "revisionTime": "2017-04-19T20:41:15Z" }, { "path": "github.com/minio/go-homedir",