Protect map from concurrent access (Fixes #2915) (#2916)

Protects the Peers RPC clients map from concurrent access to fix a data race condition.
master
Aditya Manthramurthy 8 years ago committed by Harshavardhana
parent b59bac670a
commit 6303f26330
  1. 75
      cmd/s3-peer-client.go

@ -19,6 +19,7 @@ package cmd
import ( import (
"fmt" "fmt"
"path" "path"
"sync"
"time" "time"
"github.com/minio/minio-go/pkg/set" "github.com/minio/minio-go/pkg/set"
@ -26,22 +27,30 @@ import (
type s3Peers struct { type s3Peers struct {
// A map of peer server address (in `host:port` format) to RPC // A map of peer server address (in `host:port` format) to RPC
// client connections // client connections.
rpcClient map[string]*AuthRPCClient rpcClients map[string]*AuthRPCClient
// slice of all peer addresses (in `host:port` format) mutex *sync.RWMutex
// Slice of all peer addresses (in `host:port` format).
peers []string peers []string
} }
func initGlobalS3Peers(disks []string) { func initGlobalS3Peers(disks []string) {
// get list of de-duplicated peers // Get list of de-duplicated peers.
peers := getAllPeers(disks) peers := getAllPeers(disks)
globalS3Peers = s3Peers{make(map[string]*AuthRPCClient), nil}
// Initialize global state.
globalS3Peers = s3Peers{
rpcClients: make(map[string]*AuthRPCClient),
mutex: &sync.RWMutex{},
}
// Initialize each peer connection.
for _, peer := range peers { for _, peer := range peers {
globalS3Peers.InitS3PeerClient(peer) globalS3Peers.InitS3PeerClient(peer)
} }
// Additionally setup a local peer if one does not exist // Additionally setup a local peer if one does not exist.
if globalS3Peers.GetPeerClient(globalMinioAddr) == nil { if globalS3Peers.GetPeerClient(globalMinioAddr) == nil {
globalS3Peers.InitS3PeerClient(globalMinioAddr) globalS3Peers.InitS3PeerClient(globalMinioAddr)
peers = append(peers, globalMinioAddr) peers = append(peers, globalMinioAddr)
@ -55,16 +64,23 @@ func (s3p *s3Peers) GetPeers() []string {
} }
func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient { func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient {
return s3p.rpcClient[peer] // Take a read lock
s3p.mutex.RLock()
defer s3p.mutex.RUnlock()
return s3p.rpcClients[peer]
} }
// Initializes a new RPC connection (or closes and re-opens if it // Initializes a new RPC connection (or closes and re-opens if it
// already exists) to a peer. Note that peer address is in `host:port` // already exists) to a peer. Note that peer address is in `host:port`
// format. // format.
func (s3p *s3Peers) InitS3PeerClient(peer string) { func (s3p *s3Peers) InitS3PeerClient(peer string) {
if s3p.rpcClient[peer] != nil { // Take a write lock
s3p.rpcClient[peer].Close() s3p.mutex.Lock()
delete(s3p.rpcClient, peer) defer s3p.mutex.Unlock()
if s3p.rpcClients[peer] != nil {
_ = s3p.rpcClients[peer].Close()
delete(s3p.rpcClients, peer)
} }
authCfg := &authConfig{ authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID, accessKey: serverConfig.GetCredential().AccessKeyID,
@ -73,16 +89,20 @@ func (s3p *s3Peers) InitS3PeerClient(peer string) {
path: path.Join(reservedBucket, s3Path), path: path.Join(reservedBucket, s3Path),
loginMethod: "S3.LoginHandler", loginMethod: "S3.LoginHandler",
} }
s3p.rpcClient[peer] = newAuthClient(authCfg) s3p.rpcClients[peer] = newAuthClient(authCfg)
} }
func (s3p *s3Peers) Close() error { func (s3p *s3Peers) Close() error {
for _, v := range s3p.rpcClient { // Take a write lock
s3p.mutex.Lock()
defer s3p.mutex.Unlock()
for _, v := range s3p.rpcClients {
if err := v.Close(); err != nil { if err := v.Close(); err != nil {
return err return err
} }
} }
s3p.rpcClient = nil s3p.rpcClients = nil
s3p.peers = nil s3p.peers = nil
return nil return nil
} }
@ -121,33 +141,48 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
SetToken(token string) SetToken(token string)
SetTimestamp(tstamp time.Time) SetTimestamp(tstamp time.Time)
}) map[string]error { }) map[string]error {
// result type // Take read lock for rpcClient map
s3p.mutex.RLock()
defer s3p.mutex.RUnlock()
// Result type
type callResult struct { type callResult struct {
target string target string
err error err error
} }
// channel to collect results from goroutines
// Channel to collect results from goroutines
resChan := make(chan callResult) resChan := make(chan callResult)
// closure to make a single request.
// Closure to make a single request.
callTarget := func(target string) { callTarget := func(target string) {
reply := &GenericReply{} reply := &GenericReply{}
err := s3p.rpcClient[target].Call(method, args, reply) client, ok := s3p.rpcClients[target]
var err error
if !ok {
err = fmt.Errorf("Requested client was not initialized - %v",
target)
} else {
err = client.Call(method, args, reply)
}
resChan <- callResult{target, err} resChan <- callResult{target, err}
} }
// map of errors
// Map of errors
errsMap := make(map[string]error) errsMap := make(map[string]error)
// make network calls in parallel // make network calls in parallel
for _, target := range peers { for _, target := range peers {
go callTarget(target) go callTarget(target)
} }
// wait on channel and collect all results // Wait on channel and collect all results
for range peers { for range peers {
res := <-resChan res := <-resChan
if res.err != nil { if res.err != nil {
errsMap[res.target] = res.err errsMap[res.target] = res.err
} }
} }
// return errors map
// Return errors map
return errsMap return errsMap
} }

Loading…
Cancel
Save