fix: Avoid concurrent map writes in go-routines (#5898)

Fixes #5897
master
Harshavardhana 7 years ago committed by Dee Koder
parent 2c0020e9ee
commit 98f81ced86
  1. 6
      cmd/bucket-handlers.go
  2. 14
      cmd/bucket-notification-handlers.go
  3. 12
      cmd/bucket-policy-handlers.go
  4. 92
      cmd/notification.go
  5. 12
      cmd/web-handlers.go

@ -655,9 +655,9 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
globalNotificationSys.RemoveNotification(bucket) globalNotificationSys.RemoveNotification(bucket)
globalPolicySys.Remove(bucket) globalPolicySys.Remove(bucket)
for addr, err := range globalNotificationSys.DeleteBucket(bucket) { for _, nerr := range globalNotificationSys.DeleteBucket(bucket) {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
// Write success response. // Write success response.

@ -146,9 +146,9 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
rulesMap := config.ToRulesMap() rulesMap := config.ToRulesMap()
globalNotificationSys.AddRulesMap(bucketName, rulesMap) globalNotificationSys.AddRulesMap(bucketName, rulesMap)
for addr, err := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { for _, nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
writeSuccessResponseHeadersOnly(w) writeSuccessResponseHeadersOnly(w)
@ -251,10 +251,10 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
return return
} }
errors := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) errs := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr)
for addr, err := range errors { for _, nerr := range errs {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
<-target.DoneCh <-target.DoneCh

@ -91,9 +91,9 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht
} }
globalPolicySys.Set(bucket, *bucketPolicy) globalPolicySys.Set(bucket, *bucketPolicy)
for addr, err := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { for _, nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
// Success. // Success.
@ -130,9 +130,9 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r
} }
globalPolicySys.Remove(bucket) globalPolicySys.Remove(bucket)
for addr, err := range globalNotificationSys.RemoveBucketPolicy(bucket) { for _, nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
// Success. // Success.

@ -59,94 +59,126 @@ func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient {
return sys.peerRPCClientMap[addr] return sys.peerRPCClientMap[addr]
} }
// NotificationPeerErr returns error associated for a remote peer.
type NotificationPeerErr struct {
Host xnet.Host // Remote host on which the rpc call was initiated
Err error // Error returned by the remote peer for an rpc call
}
// DeleteBucket - calls DeleteBucket RPC call on all peers. // DeleteBucket - calls DeleteBucket RPC call on all peers.
func (sys *NotificationSys) DeleteBucket(bucketName string) map[xnet.Host]error { func (sys *NotificationSys) DeleteBucket(bucketName string) []NotificationPeerErr {
errors := make(map[xnet.Host]error) errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
var wg sync.WaitGroup var wg sync.WaitGroup
idx := 0
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(idx int, addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.DeleteBucket(bucketName); err != nil { if err := client.DeleteBucket(bucketName); err != nil {
errors[addr] = err errs[idx] = NotificationPeerErr{
Host: addr,
Err: err,
} }
}(addr, client) }
}(idx, addr, client)
idx++
} }
wg.Wait() wg.Wait()
return errors return errs
} }
// SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers.
func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) map[xnet.Host]error { func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) []NotificationPeerErr {
errors := make(map[xnet.Host]error) errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
var wg sync.WaitGroup var wg sync.WaitGroup
idx := 0
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(idx int, addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil {
errors[addr] = err errs[idx] = NotificationPeerErr{
Host: addr,
Err: err,
}
} }
}(addr, client) }(idx, addr, client)
idx++
} }
wg.Wait() wg.Wait()
return errors return errs
} }
// RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers.
func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) map[xnet.Host]error { func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) []NotificationPeerErr {
errors := make(map[xnet.Host]error) errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
var wg sync.WaitGroup var wg sync.WaitGroup
idx := 0
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(idx int, addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.RemoveBucketPolicy(bucketName); err != nil { if err := client.RemoveBucketPolicy(bucketName); err != nil {
errors[addr] = err errs[idx] = NotificationPeerErr{
Host: addr,
Err: err,
} }
}(addr, client) }
}(idx, addr, client)
idx++
} }
wg.Wait() wg.Wait()
return errors return errs
} }
// PutBucketNotification - calls PutBucketNotification RPC call on all peers. // PutBucketNotification - calls PutBucketNotification RPC call on all peers.
func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) map[xnet.Host]error { func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) []NotificationPeerErr {
errors := make(map[xnet.Host]error) errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
var wg sync.WaitGroup var wg sync.WaitGroup
idx := 0
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { go func(idx int, addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) {
defer wg.Done() defer wg.Done()
if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { if err := client.PutBucketNotification(bucketName, rulesMap); err != nil {
errors[addr] = err errs[idx] = NotificationPeerErr{
Host: addr,
Err: err,
}
} }
}(addr, client, rulesMap.Clone()) }(idx, addr, client, rulesMap.Clone())
idx++
} }
wg.Wait() wg.Wait()
return errors return errs
} }
// ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers.
func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) map[xnet.Host]error { func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string,
errors := make(map[xnet.Host]error) targetID event.TargetID, localPeer xnet.Host) []NotificationPeerErr {
errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap))
var wg sync.WaitGroup var wg sync.WaitGroup
idx := 0
for addr, client := range sys.peerRPCClientMap { for addr, client := range sys.peerRPCClientMap {
wg.Add(1) wg.Add(1)
go func(addr xnet.Host, client *PeerRPCClient) { go func(idx int, addr xnet.Host, client *PeerRPCClient) {
defer wg.Done() defer wg.Done()
if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil {
errors[addr] = err errs[idx] = NotificationPeerErr{
Host: addr,
Err: err,
}
} }
}(addr, client) }(idx, addr, client)
idx++
} }
wg.Wait() wg.Wait()
return errors return errs
} }
// AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name. // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.

@ -169,9 +169,9 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs,
globalNotificationSys.RemoveNotification(args.BucketName) globalNotificationSys.RemoveNotification(args.BucketName)
globalPolicySys.Remove(args.BucketName) globalPolicySys.Remove(args.BucketName)
for addr, err := range globalNotificationSys.DeleteBucket(args.BucketName) { for _, nerr := range globalNotificationSys.DeleteBucket(args.BucketName) {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
reply.UIVersion = browser.UIVersion reply.UIVersion = browser.UIVersion
@ -943,9 +943,9 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic
} }
globalPolicySys.Set(args.BucketName, *bucketPolicy) globalPolicySys.Set(args.BucketName, *bucketPolicy)
for addr, err := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { for _, nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) {
logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name)
logger.LogIf(ctx, err) logger.LogIf(ctx, nerr.Err)
} }
return nil return nil

Loading…
Cancel
Save