From 6303f263308b3f1f9f47a4e23a34643d3faa0878 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Thu, 13 Oct 2016 01:33:50 -0700 Subject: [PATCH] Protect map from concurrent access (Fixes #2915) (#2916) Protects the Peers RPC clients map from concurrent access to fix a data race condition. --- cmd/s3-peer-client.go | 75 +++++++++++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 20 deletions(-) diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index 458ced6f1..ad1d2d06f 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "path" + "sync" "time" "github.com/minio/minio-go/pkg/set" @@ -26,22 +27,30 @@ import ( type s3Peers struct { // A map of peer server address (in `host:port` format) to RPC - // client connections - rpcClient map[string]*AuthRPCClient + // client connections. + rpcClients map[string]*AuthRPCClient - // slice of all peer addresses (in `host:port` format) + mutex *sync.RWMutex + + // Slice of all peer addresses (in `host:port` format). peers []string } func initGlobalS3Peers(disks []string) { - // get list of de-duplicated peers + // Get list of de-duplicated peers. peers := getAllPeers(disks) - globalS3Peers = s3Peers{make(map[string]*AuthRPCClient), nil} + + // Initialize global state. + globalS3Peers = s3Peers{ + rpcClients: make(map[string]*AuthRPCClient), + mutex: &sync.RWMutex{}, + } + // Initialize each peer connection. for _, peer := range peers { globalS3Peers.InitS3PeerClient(peer) } - // Additionally setup a local peer if one does not exist + // Additionally setup a local peer if one does not exist. if globalS3Peers.GetPeerClient(globalMinioAddr) == nil { globalS3Peers.InitS3PeerClient(globalMinioAddr) peers = append(peers, globalMinioAddr) @@ -55,16 +64,23 @@ func (s3p *s3Peers) GetPeers() []string { } func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient { - return s3p.rpcClient[peer] + // Take a read lock + s3p.mutex.RLock() + defer s3p.mutex.RUnlock() + return s3p.rpcClients[peer] } // Initializes a new RPC connection (or closes and re-opens if it // already exists) to a peer. Note that peer address is in `host:port` // format. func (s3p *s3Peers) InitS3PeerClient(peer string) { - if s3p.rpcClient[peer] != nil { - s3p.rpcClient[peer].Close() - delete(s3p.rpcClient, peer) + // Take a write lock + s3p.mutex.Lock() + defer s3p.mutex.Unlock() + + if s3p.rpcClients[peer] != nil { + _ = s3p.rpcClients[peer].Close() + delete(s3p.rpcClients, peer) } authCfg := &authConfig{ accessKey: serverConfig.GetCredential().AccessKeyID, @@ -73,16 +89,20 @@ func (s3p *s3Peers) InitS3PeerClient(peer string) { path: path.Join(reservedBucket, s3Path), loginMethod: "S3.LoginHandler", } - s3p.rpcClient[peer] = newAuthClient(authCfg) + s3p.rpcClients[peer] = newAuthClient(authCfg) } func (s3p *s3Peers) Close() error { - for _, v := range s3p.rpcClient { + // Take a write lock + s3p.mutex.Lock() + defer s3p.mutex.Unlock() + + for _, v := range s3p.rpcClients { if err := v.Close(); err != nil { return err } } - s3p.rpcClient = nil + s3p.rpcClients = nil s3p.peers = nil return nil } @@ -121,33 +141,48 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface { SetToken(token string) SetTimestamp(tstamp time.Time) }) map[string]error { - // result type + // Take read lock for rpcClient map + s3p.mutex.RLock() + defer s3p.mutex.RUnlock() + + // Result type type callResult struct { target string err error } - // channel to collect results from goroutines + + // Channel to collect results from goroutines resChan := make(chan callResult) - // closure to make a single request. + + // Closure to make a single request. callTarget := func(target string) { reply := &GenericReply{} - err := s3p.rpcClient[target].Call(method, args, reply) + client, ok := s3p.rpcClients[target] + var err error + if !ok { + err = fmt.Errorf("Requested client was not initialized - %v", + target) + } else { + err = client.Call(method, args, reply) + } resChan <- callResult{target, err} } - // map of errors + + // Map of errors errsMap := make(map[string]error) // make network calls in parallel for _, target := range peers { go callTarget(target) } - // wait on channel and collect all results + // Wait on channel and collect all results for range peers { res := <-resChan if res.err != nil { errsMap[res.target] = res.err } } - // return errors map + + // Return errors map return errsMap }