Fix a bug in dsync initialization and communication (#5428)

In current implementation we used as many dsync clients
as per number of endpoints(along with path) which is not
the expected implementation. The implementation of Dsync
was expected to be just for the endpoint Host alone such
that if you have 4 servers and each with 4 disks we need
to only have 4 dsync clients and 4 dsync servers. But
we currently had 8 clients, servers which in-fact is
unexpected and should be avoided.

This PR brings the implementation back to its original
intention. This issue was found #5160
master
Harshavardhana 7 years ago committed by kannappanr
parent bb73c84b10
commit f3f09ed14e
  1. 19
      cmd/lock-rpc-server.go
  2. 21
      cmd/lock-rpc-server_test.go
  3. 38
      cmd/namespace-lock.go
  4. 4
      cmd/server-main.go
  5. 10
      vendor/github.com/minio/dsync/README.md
  6. 88
      vendor/github.com/minio/dsync/drwmutex.go
  7. 63
      vendor/github.com/minio/dsync/dsync.go
  8. 6
      vendor/vendor.json

@ -19,7 +19,6 @@ package cmd
import (
"fmt"
"math/rand"
"path"
"sync"
"time"
@ -64,8 +63,7 @@ type lockServer struct {
}
// Start lock maintenance from all lock servers.
func startLockMaintenance(lockServers []*lockServer) {
for _, locker := range lockServers {
func startLockMaintenance(lkSrv *lockServer) {
// Start loop for stale lock maintenance
go func(lk *lockServer) {
// Initialize a new ticker with a minute between each ticks.
@ -84,29 +82,26 @@ func startLockMaintenance(lockServers []*lockServer) {
lk.lockMaintenance(lockValidityCheckInterval)
}
}
}(locker)
}
}(lkSrv)
}
// Register distributed NS lock handlers.
func registerDistNSLockRouter(mux *router.Router, endpoints EndpointList) error {
// Start lock maintenance from all lock servers.
startLockMaintenance(globalLockServers)
startLockMaintenance(globalLockServer)
// Register initialized lock servers to their respective rpc endpoints.
return registerStorageLockers(mux, globalLockServers)
return registerStorageLockers(mux, globalLockServer)
}
// registerStorageLockers - register locker rpc handlers for net/rpc library clients
func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error {
for _, lockServer := range lockServers {
func registerStorageLockers(mux *router.Router, lkSrv *lockServer) error {
lockRPCServer := newRPCServer()
if err := lockRPCServer.RegisterName(lockServiceName, lockServer); err != nil {
if err := lockRPCServer.RegisterName(lockServiceName, lkSrv); err != nil {
return errors.Trace(err)
}
lockRouter := mux.PathPrefix(minioReservedBucketPath).Subrouter()
lockRouter.Path(path.Join(lockServicePath, lockServer.ll.serviceEndpoint)).Handler(lockRPCServer)
}
lockRouter.Path(lockServicePath).Handler(lockRPCServer)
return nil
}

@ -425,8 +425,8 @@ func TestLockRpcServerExpired(t *testing.T) {
}
}
// Test initialization of lock servers.
func TestLockServers(t *testing.T) {
// Test initialization of lock server.
func TestLockServerInit(t *testing.T) {
if runtime.GOOS == globalWindowsOSName {
return
}
@ -438,8 +438,10 @@ func TestLockServers(t *testing.T) {
defer os.RemoveAll(rootPath)
currentIsDistXL := globalIsDistXL
currentLockServer := globalLockServer
defer func() {
globalIsDistXL = currentIsDistXL
globalLockServer = currentLockServer
}()
case1Endpoints := mustGetNewEndpointList(
@ -470,24 +472,25 @@ func TestLockServers(t *testing.T) {
testCases := []struct {
isDistXL bool
endpoints EndpointList
totalLockServers int
}{
// Test - 1 one lock server initialized.
{true, case1Endpoints, 1},
// Test - 2 two servers possible.
{true, case2Endpoints, 2},
{true, case1Endpoints},
// Test - similar endpoint hosts should
// converge to single lock server
// initialized.
{true, case2Endpoints},
}
// Validates lock server initialization.
for i, testCase := range testCases {
globalIsDistXL = testCase.isDistXL
globalLockServers = nil
globalLockServer = nil
_, _ = newDsyncNodes(testCase.endpoints)
if err != nil {
t.Fatalf("Got unexpected error initializing lock servers: %v", err)
}
if len(globalLockServers) != testCase.totalLockServers {
t.Fatalf("Test %d: Expected total %d, got %d", i+1, testCase.totalLockServers, len(globalLockServers))
if globalLockServer == nil && testCase.isDistXL {
t.Errorf("Test %d: Expected initialized lockServer, but got uninitialized", i+1)
}
}
}

@ -28,13 +28,17 @@ import (
"github.com/minio/dsync"
"github.com/minio/lsync"
"github.com/minio/minio-go/pkg/set"
)
// Global name space lock.
var globalNSMutex *nsLockMap
// Global lock servers
var globalLockServers []*lockServer
// Global lock server one per server.
var globalLockServer *lockServer
// Instance of dsync for distributed clients.
var globalDsync *dsync.Dsync
// RWLocker - locker interface to introduce GetRLock, RUnlock.
type RWLocker interface {
@ -56,39 +60,41 @@ type RWLockerSync interface {
// Returns lock clients and the node index for the current server.
func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) {
cred := globalServerConfig.GetCredential()
clnts = make([]dsync.NetLocker, len(endpoints))
myNode = -1
for index, endpoint := range endpoints {
seenHosts := set.NewStringSet()
for _, endpoint := range endpoints {
if seenHosts.Contains(endpoint.Host) {
continue
}
seenHosts.Add(endpoint.Host)
if !endpoint.IsLocal {
// For a remote endpoints setup a lock RPC client.
clnts[index] = newLockRPCClient(authConfig{
clnts = append(clnts, newLockRPCClient(authConfig{
accessKey: cred.AccessKey,
secretKey: cred.SecretKey,
serverAddr: endpoint.Host,
secureConn: globalIsSSL,
serviceEndpoint: pathutil.Join(minioReservedBucketPath, lockServicePath, endpoint.Path),
serviceEndpoint: pathutil.Join(minioReservedBucketPath, lockServicePath),
serviceName: lockServiceName,
})
}))
continue
}
// Local endpoint
if myNode == -1 {
myNode = index
}
myNode = len(clnts)
// For a local endpoint, setup a local lock server to
// avoid network requests.
localLockServer := lockServer{
AuthRPCServer: AuthRPCServer{},
ll: localLocker{
mutex: sync.Mutex{},
serviceEndpoint: endpoint.Path,
serverAddr: endpoint.Host,
serviceEndpoint: pathutil.Join(minioReservedBucketPath, lockServicePath),
lockMap: make(map[string][]lockRequesterInfo),
},
}
globalLockServers = append(globalLockServers, &localLockServer)
clnts[index] = &(localLockServer.ll)
globalLockServer = &localLockServer
clnts = append(clnts, &(localLockServer.ll))
}
return clnts, myNode
@ -149,7 +155,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
nsLk = &nsLock{
RWLockerSync: func() RWLockerSync {
if n.isDistXL {
return dsync.NewDRWMutex(pathJoin(volume, path))
return dsync.NewDRWMutex(pathJoin(volume, path), globalDsync)
}
return &lsync.LRWMutex{}
}(),
@ -303,7 +309,7 @@ func (n *nsLockMap) ForceUnlock(volume, path string) {
// are blocking can now proceed as normal and any new locks will also
// participate normally.
if n.isDistXL { // For distributed mode, broadcast ForceUnlock message.
dsync.NewDRWMutex(pathJoin(volume, path)).ForceUnlock()
dsync.NewDRWMutex(pathJoin(volume, path), globalDsync).ForceUnlock()
}
param := nsParam{volume, path}

@ -186,8 +186,8 @@ func serverMain(ctx *cli.Context) {
// Set nodes for dsync for distributed setup.
if globalIsDistXL {
clnts, myNode := newDsyncNodes(globalEndpoints)
fatalIf(dsync.Init(clnts, myNode), "Unable to initialize distributed locking clients")
globalDsync, err = dsync.New(newDsyncNodes(globalEndpoints))
fatalIf(err, "Unable to initialize distributed locking clients")
}
// Initialize name space lock.

@ -87,11 +87,15 @@ The system can be pushed to 75K locks/sec at 50% CPU load.
Usage
-----
> NOTE: Previously if you were using `dsync.Init([]NetLocker, nodeIndex)` to initialize dsync has
been changed to `dsync.New([]NetLocker, nodeIndex)` which returns a `*Dsync` object to be used in
every instance of `NewDRWMutex("test", *Dsync)`
### Exclusive lock
Here is a simple example showing how to protect a single resource (drop-in replacement for `sync.Mutex`):
```
```go
import (
"github.com/minio/dsync"
)
@ -99,7 +103,7 @@ import (
func lockSameResource() {
// Create distributed mutex to protect resource 'test'
dm := dsync.NewDRWMutex("test")
dm := dsync.NewDRWMutex("test", ds)
dm.Lock()
log.Println("first lock granted")
@ -137,7 +141,7 @@ DRWMutex also supports multiple simultaneous read locks as shown below (analogou
```
func twoReadLocksAndSingleWriteLock() {
drwm := dsync.NewDRWMutex("resource")
drwm := dsync.NewDRWMutex("resource", ds)
drwm.RLock()
log.Println("1st read lock acquired, waiting...")

@ -49,6 +49,7 @@ type DRWMutex struct {
writeLocks []string // Array of nodes that granted a write lock
readersLocks [][]string // Array of array of nodes that granted reader locks
m sync.Mutex // Mutex to prevent multiple simultaneous locks from this node
clnt *Dsync
}
// Granted - represents a structure of a granted lock.
@ -66,10 +67,11 @@ func isLocked(uid string) bool {
}
// NewDRWMutex - initializes a new dsync RW mutex.
func NewDRWMutex(name string) *DRWMutex {
func NewDRWMutex(name string, clnt *Dsync) *DRWMutex {
return &DRWMutex{
Name: name,
writeLocks: make([]string, dnodeCount),
writeLocks: make([]string, clnt.dNodeCount),
clnt: clnt,
}
}
@ -128,10 +130,10 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked
// Use incremental back-off algorithm for repeated attempts to acquire the lock
for range newRetryTimerSimple(doneCh) {
// Create temp array on stack.
locks := make([]string, dnodeCount)
locks := make([]string, dm.clnt.dNodeCount)
// Try to acquire the lock.
success := lock(clnts, &locks, dm.Name, isReadLock)
success := lock(dm.clnt, &locks, dm.Name, isReadLock)
if success {
dm.m.Lock()
defer dm.m.Unlock()
@ -139,7 +141,7 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked
// If success, copy array to object
if isReadLock {
// Append new array of strings at the end
dm.readersLocks = append(dm.readersLocks, make([]string, dnodeCount))
dm.readersLocks = append(dm.readersLocks, make([]string, dm.clnt.dNodeCount))
// and copy stack array into last spot
copy(dm.readersLocks[len(dm.readersLocks)-1], locks[:])
} else {
@ -158,14 +160,14 @@ func (dm *DRWMutex) lockBlocking(timeout time.Duration, isReadLock bool) (locked
}
// lock tries to acquire the distributed lock, returning true or false.
func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) bool {
func lock(ds *Dsync, locks *[]string, lockName string, isReadLock bool) bool {
// Create buffered channel of size equal to total number of nodes.
ch := make(chan Granted, dnodeCount)
ch := make(chan Granted, ds.dNodeCount)
defer close(ch)
var wg sync.WaitGroup
for index, c := range clnts {
for index, c := range ds.rpcClnts {
wg.Add(1)
// broadcast lock request to all nodes
@ -181,8 +183,8 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
args := LockArgs{
UID: uid,
Resource: lockName,
ServerAddr: clnts[ownNode].ServerAddr(),
ServiceEndpoint: clnts[ownNode].ServiceEndpoint(),
ServerAddr: ds.rpcClnts[ds.ownNode].ServerAddr(),
ServiceEndpoint: ds.rpcClnts[ds.ownNode].ServiceEndpoint(),
}
var locked bool
@ -222,7 +224,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
done := false
timeout := time.After(DRWMutexAcquireTimeout)
for ; i < dnodeCount; i++ { // Loop until we acquired all locks
for ; i < ds.dNodeCount; i++ { // Loop until we acquired all locks
select {
case grant := <-ch:
@ -231,22 +233,22 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
(*locks)[grant.index] = grant.lockUID
} else {
locksFailed++
if !isReadLock && locksFailed > dnodeCount-dquorum ||
isReadLock && locksFailed > dnodeCount-dquorumReads {
if !isReadLock && locksFailed > ds.dNodeCount-ds.dquorum ||
isReadLock && locksFailed > ds.dNodeCount-ds.dquorumReads {
// We know that we are not going to get the lock anymore,
// so exit out and release any locks that did get acquired
done = true
// Increment the number of grants received from the buffered channel.
i++
releaseAll(clnts, locks, lockName, isReadLock)
releaseAll(ds, locks, lockName, isReadLock)
}
}
case <-timeout:
done = true
// timeout happened, maybe one of the nodes is slow, count
// number of locks to check whether we have quorum or not
if !quorumMet(locks, isReadLock) {
releaseAll(clnts, locks, lockName, isReadLock)
if !quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads) {
releaseAll(ds, locks, lockName, isReadLock)
}
}
@ -256,7 +258,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
}
// Count locks in order to determine whether we have quorum or not
quorum = quorumMet(locks, isReadLock)
quorum = quorumMet(locks, isReadLock, ds.dquorum, ds.dquorumReads)
// Signal that we have the quorum
wg.Done()
@ -264,11 +266,11 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
// Wait for the other responses and immediately release the locks
// (do not add them to the locks array because the DRWMutex could
// already has been unlocked again by the original calling thread)
for ; i < dnodeCount; i++ {
for ; i < ds.dNodeCount; i++ {
grantToBeReleased := <-ch
if grantToBeReleased.isLocked() {
// release lock
sendRelease(clnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock)
sendRelease(ds, ds.rpcClnts[grantToBeReleased.index], lockName, grantToBeReleased.lockUID, isReadLock)
}
}
}(isReadLock)
@ -276,9 +278,9 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
wg.Wait()
// Verify that localhost server is actively participating in the lock (the lock maintenance relies on this fact)
if quorum && !isLocked((*locks)[ownNode]) {
if quorum && !isLocked((*locks)[ds.ownNode]) {
// If not, release lock (and try again later)
releaseAll(clnts, locks, lockName, isReadLock)
releaseAll(ds, locks, lockName, isReadLock)
quorum = false
}
@ -286,7 +288,7 @@ func lock(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool)
}
// quorumMet determines whether we have acquired the required quorum of underlying locks or not
func quorumMet(locks *[]string, isReadLock bool) bool {
func quorumMet(locks *[]string, isReadLock bool, quorum, quorumReads int) bool {
count := 0
for _, uid := range *locks {
@ -295,21 +297,21 @@ func quorumMet(locks *[]string, isReadLock bool) bool {
}
}
var quorum bool
var metQuorum bool
if isReadLock {
quorum = count >= dquorumReads
metQuorum = count >= quorumReads
} else {
quorum = count >= dquorum
metQuorum = count >= quorum
}
return quorum
return metQuorum
}
// releaseAll releases all locks that are marked as locked
func releaseAll(clnts []NetLocker, locks *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < dnodeCount; lock++ {
func releaseAll(ds *Dsync, locks *[]string, lockName string, isReadLock bool) {
for lock := 0; lock < ds.dNodeCount; lock++ {
if isLocked((*locks)[lock]) {
sendRelease(clnts[lock], lockName, (*locks)[lock], isReadLock)
sendRelease(ds, ds.rpcClnts[lock], lockName, (*locks)[lock], isReadLock)
(*locks)[lock] = ""
}
}
@ -321,7 +323,7 @@ func releaseAll(clnts []NetLocker, locks *[]string, lockName string, isReadLock
func (dm *DRWMutex) Unlock() {
// create temp array on stack
locks := make([]string, dnodeCount)
locks := make([]string, dm.clnt.dNodeCount)
{
dm.m.Lock()
@ -342,11 +344,11 @@ func (dm *DRWMutex) Unlock() {
// Copy write locks to stack array
copy(locks, dm.writeLocks[:])
// Clear write locks array
dm.writeLocks = make([]string, dnodeCount)
dm.writeLocks = make([]string, dm.clnt.dNodeCount)
}
isReadLock := false
unlock(locks, dm.Name, isReadLock)
unlock(dm.clnt, locks, dm.Name, isReadLock)
}
// RUnlock releases a read lock held on dm.
@ -355,7 +357,7 @@ func (dm *DRWMutex) Unlock() {
func (dm *DRWMutex) RUnlock() {
// create temp array on stack
locks := make([]string, dnodeCount)
locks := make([]string, dm.clnt.dNodeCount)
{
dm.m.Lock()
@ -370,19 +372,19 @@ func (dm *DRWMutex) RUnlock() {
}
isReadLock := true
unlock(locks, dm.Name, isReadLock)
unlock(dm.clnt, locks, dm.Name, isReadLock)
}
func unlock(locks []string, name string, isReadLock bool) {
func unlock(ds *Dsync, locks []string, name string, isReadLock bool) {
// We don't need to synchronously wait until we have released all the locks (or the quorum)
// (a subsequent lock will retry automatically in case it would fail to get quorum)
for index, c := range clnts {
for index, c := range ds.rpcClnts {
if isLocked(locks[index]) {
// broadcast lock release to all nodes that granted the lock
sendRelease(c, name, locks[index], isReadLock)
sendRelease(ds, c, name, locks[index], isReadLock)
}
}
}
@ -394,24 +396,24 @@ func (dm *DRWMutex) ForceUnlock() {
defer dm.m.Unlock()
// Clear write locks array
dm.writeLocks = make([]string, dnodeCount)
dm.writeLocks = make([]string, dm.clnt.dNodeCount)
// Clear read locks array
dm.readersLocks = nil
}
for _, c := range clnts {
for _, c := range dm.clnt.rpcClnts {
// broadcast lock release to all nodes that granted the lock
sendRelease(c, dm.Name, "", false)
sendRelease(dm.clnt, c, dm.Name, "", false)
}
}
// sendRelease sends a release message to a node that previously granted a lock
func sendRelease(c NetLocker, name, uid string, isReadLock bool) {
func sendRelease(ds *Dsync, c NetLocker, name, uid string, isReadLock bool) {
args := LockArgs{
UID: uid,
Resource: name,
ServerAddr: clnts[ownNode].ServerAddr(),
ServiceEndpoint: clnts[ownNode].ServiceEndpoint(),
ServerAddr: ds.rpcClnts[ds.ownNode].ServerAddr(),
ServiceEndpoint: ds.rpcClnts[ds.ownNode].ServiceEndpoint(),
}
if len(uid) == 0 {
if _, err := c.ForceUnlock(args); err != nil {

@ -16,51 +16,52 @@
package dsync
import "errors"
import (
"errors"
)
// Number of nodes participating in the distributed locking.
var dnodeCount int
// Dsync represents dsync client object which is initialized with
// authenticated clients, used to initiate lock RPC calls.
type Dsync struct {
// Number of nodes participating in the distributed locking.
dNodeCount int
// List of rpc client objects, one per lock server.
var clnts []NetLocker
// List of rpc client objects, one per lock server.
rpcClnts []NetLocker
// Index into rpc client array for server running on localhost
var ownNode int
// Index into rpc client array for server running on localhost
ownNode int
// Simple majority based quorum, set to dNodeCount/2+1
var dquorum int
// Simple majority based quorum, set to dNodeCount/2+1
dquorum int
// Simple quorum for read operations, set to dNodeCount/2
var dquorumReads int
// Init - initializes package-level global state variables such as clnts.
// N B - This function should be called only once inside any program
// that uses dsync.
func Init(rpcClnts []NetLocker, rpcOwnNode int) (err error) {
// Simple quorum for read operations, set to dNodeCount/2
dquorumReads int
}
// Validate if number of nodes is within allowable range.
if dnodeCount != 0 {
return errors.New("Cannot reinitialize dsync package")
}
// New - initializes a new dsync object with input rpcClnts.
func New(rpcClnts []NetLocker, rpcOwnNode int) (*Dsync, error) {
if len(rpcClnts) < 4 {
return errors.New("Dsync is not designed for less than 4 nodes")
return nil, errors.New("Dsync is not designed for less than 4 nodes")
} else if len(rpcClnts) > 16 {
return errors.New("Dsync is not designed for more than 16 nodes")
return nil, errors.New("Dsync is not designed for more than 16 nodes")
} else if len(rpcClnts)%2 != 0 {
return errors.New("Dsync is not designed for an uneven number of nodes")
return nil, errors.New("Dsync is not designed for an uneven number of nodes")
}
if rpcOwnNode > len(rpcClnts) {
return errors.New("Index for own node is too large")
return nil, errors.New("Index for own node is too large")
}
dnodeCount = len(rpcClnts)
dquorum = dnodeCount/2 + 1
dquorumReads = dnodeCount / 2
ds := &Dsync{}
ds.dNodeCount = len(rpcClnts)
ds.dquorum = ds.dNodeCount/2 + 1
ds.dquorumReads = ds.dNodeCount / 2
ds.ownNode = rpcOwnNode
// Initialize node name and rpc path for each NetLocker object.
clnts = make([]NetLocker, dnodeCount)
copy(clnts, rpcClnts)
ds.rpcClnts = make([]NetLocker, ds.dNodeCount)
copy(ds.rpcClnts, rpcClnts)
ownNode = rpcOwnNode
return nil
return ds, nil
}

@ -380,10 +380,10 @@
"revisionTime": "2017-02-27T07:32:28Z"
},
{
"checksumSHA1": "hQ8i4UPTbFW68oPJP3uFxYTLfxk=",
"checksumSHA1": "qhWQM7xmqaxFqADNTj8YPjE/8Ws=",
"path": "github.com/minio/dsync",
"revision": "a26b9de6c8006208d10a9517720d3212b42c374e",
"revisionTime": "2017-05-25T17:53:53Z"
"revision": "ed0989bc6c7b199f749fa6be0b7ee98d689b88c7",
"revisionTime": "2017-11-22T09:16:00Z"
},
{
"path": "github.com/minio/go-homedir",

Loading…
Cancel
Save