From e40a5e05e1e77f6e5df3b978b2a4f3255d2eb315 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas <634494+krishnasrinivas@users.noreply.github.com> Date: Tue, 3 Jul 2018 11:09:36 -0700 Subject: [PATCH] Do notification in background to not block S3 client REST calls (#6005) --- cmd/bucket-handlers.go | 5 +- cmd/bucket-notification-handlers.go | 11 +--- cmd/bucket-policy-handlers.go | 10 +--- cmd/notification.go | 84 +++++++++-------------------- cmd/web-handlers.go | 10 +--- 5 files changed, 33 insertions(+), 87 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 6e387aa70..48f50f14c 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -753,10 +753,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. globalNotificationSys.RemoveNotification(bucket) globalPolicySys.Remove(bucket) - for nerr := range globalNotificationSys.DeleteBucket(bucket) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.DeleteBucket(ctx, bucket) if globalDNSConfig != nil { if err := globalDNSConfig.Delete(bucket); err != nil { diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 6aa03b68d..c67999286 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -146,10 +146,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, rulesMap := config.ToRulesMap() globalNotificationSys.AddRulesMap(bucketName, rulesMap) - for nerr := range globalNotificationSys.PutBucketNotification(bucketName, rulesMap) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.PutBucketNotification(ctx, bucketName, rulesMap) writeSuccessResponseHeadersOnly(w) } @@ -260,11 +257,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit return } - errCh := globalNotificationSys.ListenBucketNotification(bucketName, eventNames, pattern, target.ID(), *thisAddr) - for nerr := range errCh { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.ListenBucketNotification(ctx, bucketName, eventNames, pattern, target.ID(), *thisAddr) <-target.DoneCh diff --git a/cmd/bucket-policy-handlers.go b/cmd/bucket-policy-handlers.go index 23dfc6162..72a57a8b1 100644 --- a/cmd/bucket-policy-handlers.go +++ b/cmd/bucket-policy-handlers.go @@ -91,10 +91,7 @@ func (api objectAPIHandlers) PutBucketPolicyHandler(w http.ResponseWriter, r *ht } globalPolicySys.Set(bucket, *bucketPolicy) - for nerr := range globalNotificationSys.SetBucketPolicy(bucket, bucketPolicy) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.SetBucketPolicy(ctx, bucket, bucketPolicy) // Success. writeSuccessNoContent(w) @@ -130,10 +127,7 @@ func (api objectAPIHandlers) DeleteBucketPolicyHandler(w http.ResponseWriter, r } globalPolicySys.Remove(bucket) - for nerr := range globalNotificationSys.RemoveBucketPolicy(bucket) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.RemoveBucketPolicy(ctx, bucket) // Success. writeSuccessNoContent(w) diff --git a/cmd/notification.go b/cmd/notification.go index 9475b014a..d2423fb83 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -68,28 +68,21 @@ type NotificationPeerErr struct { } // DeleteBucket - calls DeleteBucket RPC call on all peers. -func (sys *NotificationSys) DeleteBucket(bucketName string) <-chan NotificationPeerErr { - errCh := make(chan NotificationPeerErr) +func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) { go func() { - defer close(errCh) - var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.DeleteBucket(bucketName); err != nil { - errCh <- NotificationPeerErr{ - Host: addr, - Err: err, - } + logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() - - return errCh } // SetCredentials - calls SetCredentials RPC call on all peers. @@ -120,104 +113,76 @@ func (sys *NotificationSys) SetCredentials(credentials auth.Credentials) map[xne } // SetBucketPolicy - calls SetBucketPolicy RPC call on all peers. -func (sys *NotificationSys) SetBucketPolicy(bucketName string, bucketPolicy *policy.Policy) <-chan NotificationPeerErr { - errCh := make(chan NotificationPeerErr) +func (sys *NotificationSys) SetBucketPolicy(ctx context.Context, bucketName string, bucketPolicy *policy.Policy) { go func() { - defer close(errCh) - var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.SetBucketPolicy(bucketName, bucketPolicy); err != nil { - errCh <- NotificationPeerErr{ - Host: addr, - Err: err, - } + logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() - - return errCh } // RemoveBucketPolicy - calls RemoveBucketPolicy RPC call on all peers. -func (sys *NotificationSys) RemoveBucketPolicy(bucketName string) <-chan NotificationPeerErr { - errCh := make(chan NotificationPeerErr) +func (sys *NotificationSys) RemoveBucketPolicy(ctx context.Context, bucketName string) { go func() { - defer close(errCh) - var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.RemoveBucketPolicy(bucketName); err != nil { - errCh <- NotificationPeerErr{ - Host: addr, - Err: err, - } + logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() - - return errCh } // PutBucketNotification - calls PutBucketNotification RPC call on all peers. -func (sys *NotificationSys) PutBucketNotification(bucketName string, rulesMap event.RulesMap) <-chan NotificationPeerErr { - errCh := make(chan NotificationPeerErr) +func (sys *NotificationSys) PutBucketNotification(ctx context.Context, bucketName string, rulesMap event.RulesMap) { go func() { - defer close(errCh) - var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient, rulesMap event.RulesMap) { defer wg.Done() if err := client.PutBucketNotification(bucketName, rulesMap); err != nil { - errCh <- NotificationPeerErr{ - Host: addr, - Err: err, - } + logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.LogIf(ctx, err) } }(addr, client, rulesMap.Clone()) } wg.Wait() }() - - return errCh } // ListenBucketNotification - calls ListenBucketNotification RPC call on all peers. -func (sys *NotificationSys) ListenBucketNotification(bucketName string, eventNames []event.Name, pattern string, - targetID event.TargetID, localPeer xnet.Host) <-chan NotificationPeerErr { - errCh := make(chan NotificationPeerErr) +func (sys *NotificationSys) ListenBucketNotification(ctx context.Context, bucketName string, eventNames []event.Name, pattern string, + targetID event.TargetID, localPeer xnet.Host) { go func() { - defer close(errCh) - var wg sync.WaitGroup for addr, client := range sys.peerRPCClientMap { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() if err := client.ListenBucketNotification(bucketName, eventNames, pattern, targetID, localPeer); err != nil { - errCh <- NotificationPeerErr{ - Host: addr, - Err: err, - } + logger.GetReqInfo(ctx).AppendTags("remotePeer", addr.Name) + logger.LogIf(ctx, err) } }(addr, client) } wg.Wait() }() - - return errCh } // AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name. @@ -556,13 +521,16 @@ func sendEvent(args eventArgs) { return } - for _, err := range globalNotificationSys.Send(args) { - reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name} - reqInfo.AppendTags("EventName", args.EventName.String()) - reqInfo.AppendTags("targetID", err.ID.Name) - ctx := logger.SetReqInfo(context.Background(), reqInfo) - logger.LogOnceIf(ctx, err.Err, err.ID) - } + notifyCh := globalNotificationSys.Send(args) + go func() { + for _, err := range notifyCh { + reqInfo := &logger.ReqInfo{BucketName: args.BucketName, ObjectName: args.Object.Name} + reqInfo.AppendTags("EventName", args.EventName.String()) + reqInfo.AppendTags("targetID", err.ID.Name) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogOnceIf(ctx, err.Err, err.ID) + } + }() } func saveConfig(objAPI ObjectLayer, configFile string, data []byte) error { diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index a3be7ab83..c75a7222d 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -193,10 +193,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, globalNotificationSys.RemoveNotification(args.BucketName) globalPolicySys.Remove(args.BucketName) - for nerr := range globalNotificationSys.DeleteBucket(args.BucketName) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.DeleteBucket(ctx, args.BucketName) if globalDNSConfig != nil { if err := globalDNSConfig.Delete(args.BucketName); err != nil { @@ -979,10 +976,7 @@ func (web *webAPIHandlers) SetBucketPolicy(r *http.Request, args *SetBucketPolic } globalPolicySys.Set(args.BucketName, *bucketPolicy) - for nerr := range globalNotificationSys.SetBucketPolicy(args.BucketName, bucketPolicy) { - logger.GetReqInfo(ctx).AppendTags("remotePeer", nerr.Host.Name) - logger.LogIf(ctx, nerr.Err) - } + globalNotificationSys.SetBucketPolicy(ctx, args.BucketName, bucketPolicy) return nil }