diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 63d527015..d797ded9a 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -655,9 +655,9 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. globalNotificationSys.RemoveNotification(bucket) globalPolicySys.Remove(bucket) - for addr, err := range globalNotificationSys.DeleteBucket(bucket) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.DeleteBucket(bucket) { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } // Write success response. diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 6d13d6801..c897fedbf 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -146,9 +146,9 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, rulesMap := config.ToRulesMap() globalNotificationSys.AddRulesMap(bucketName, rulesMap) - for addr, err := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } writeSuccessResponseHeadersOnly(w) @@ -251,10 +251,10 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit return } - errors := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) - for addr, err := range errors { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + errs := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) + for _, nerr := range errs { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } <-target.DoneCh diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 41e3863ea..ce4566bdd 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -91,9 +91,9 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht } globalPolicySys.Set(bucket, *bucketPolicy) - for addr, err := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } // Success. @@ -130,9 +130,9 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r } globalPolicySys.Remove(bucket) - for addr, err := range globalNotificationSys.RemoveBucketPolicy(bucket) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } // Success. diff --git a/cmd/notification.go b/cmd/notification.go index d5dbaa465..9d1662608 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -59,94 +59,126 @@ func (sys *NotificationSys) GetPeerRPCClient(addr xnet.Host) *PeerRPCClient { return sys.peerRPCClientMap[addr] } +// NotificationPeerErr returns error associated for a remote peer. +type NotificationPeerErr struct { + Host xnet.Host // Remote host on which the rpc call was initiated + Err error // Error returned by the remote peer for an rpc call +} + // DeleteBucket - calls DeleteBucket RPC call on all peers. -func (sys *NotificationSys) DeleteBucket(bucketName string) map[xnet.Host]error { - errors := make(map[xnet.Host]error) +func (sys *NotificationSys) DeleteBucket(bucketName string) []NotificationPeerErr { + errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) var wg sync.WaitGroup + idx := 0 for addr, client := range sys.peerRPCClientMap { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(idx int, addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.DeleteBucket(bucketName); err != nil { - errors[addr] = err + errs[idx] = NotificationPeerErr{ + Host: addr, + Err: err, + } } - }(addr, client) + }(idx, addr, client) + idx++ } wg.Wait() - return errors + return errs } // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. -func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) map[xnet.Host]error { - errors := make(map[xnet.Host]error) +func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) []NotificationPeerErr { + errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) var wg sync.WaitGroup + idx := 0 for addr, client := range sys.peerRPCClientMap { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(idx int, addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { - errors[addr] = err + errs[idx] = NotificationPeerErr{ + Host: addr, + Err: err, + } } - }(addr, client) + }(idx, addr, client) + idx++ } wg.Wait() - return errors + return errs } // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. -func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) map[xnet.Host]error { - errors := make(map[xnet.Host]error) +func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) []NotificationPeerErr { + errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) var wg sync.WaitGroup + idx := 0 for addr, client := range sys.peerRPCClientMap { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(idx int, addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.RemoveBucketPolicy(bucketName); err != nil { - errors[addr] = err + errs[idx] = NotificationPeerErr{ + Host: addr, + Err: err, + } } - }(addr, client) + }(idx, addr, client) + idx++ } wg.Wait() - return errors + return errs } // PutBucketNotification - calls PutBucketNotification RPC call on all peers. -func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) map[xnet.Host]error { - errors := make(map[xnet.Host]error) +func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) []NotificationPeerErr { + errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) var wg sync.WaitGroup + idx := 0 for addr, client := range sys.peerRPCClientMap { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { + go func(idx int, addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { defer wg.Done() if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { - errors[addr] = err + errs[idx] = NotificationPeerErr{ + Host: addr, + Err: err, + } } - }(addr, client, rulesMap.Clone()) + }(idx, addr, client, rulesMap.Clone()) + idx++ } wg.Wait() - return errors + return errs } // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. -func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, targetID event.TargetID, localPeer xnet.Host) map[xnet.Host]error { - errors := make(map[xnet.Host]error) +func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, + targetID event.TargetID, localPeer xnet.Host) []NotificationPeerErr { + errs := make([]NotificationPeerErr, len(sys.peerRPCClientMap)) var wg sync.WaitGroup + idx := 0 for addr, client := range sys.peerRPCClientMap { wg.Add(1) - go func(addr xnet.Host, client *PeerRPCClient) { + go func(idx int, addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { - errors[addr] = err + errs[idx] = NotificationPeerErr{ + Host: addr, + Err: err, + } } - }(addr, client) + }(idx, addr, client) + idx++ } wg.Wait() - return errors + return errs } // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index b6827a885..622bafb02 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -169,9 +169,9 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, globalNotificationSys.RemoveNotification(args.BucketName) globalPolicySys.Remove(args.BucketName) - for addr, err := range globalNotificationSys.DeleteBucket(args.BucketName) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.DeleteBucket(args.BucketName) { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } reply.UIVersion = browser.UIVersion @@ -943,9 +943,9 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic } globalPolicySys.Set(args.BucketName, *bucketPolicy) - for addr, err := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) - logger.LogIf(ctx, err) + for _, nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { + logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) + logger.LogIf(ctx, nerr.Err) } return nil