From d02cb963d539752a6dea10a110f0d40daeb2e651 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Sun, 16 Oct 2016 20:52:10 -0700 Subject: [PATCH] Fix listen-bucket (Fixes #2942) (#2949) Don't close socket while re-initializing notify-listeners, as the rpc client object is shared between notify-listeners and peer clients. Also, improves SendRPC() readability by using GetPeerClient(). --- cmd/bucket-notification-handlers.go | 6 +++++- cmd/event-notifier.go | 4 ---- cmd/notify-listener.go | 5 ----- cmd/s3-peer-client.go | 8 ++------ 4 files changed, 7 insertions(+), 16 deletions(-) diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 402e5cd8c..b44efd44b 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -322,7 +322,11 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit nEventCh := make(chan []NotificationEvent) defer close(nEventCh) // Add channel for listener events - globalEventNotifier.AddListenerChan(accountARN, nEventCh) + if err = globalEventNotifier.AddListenerChan(accountARN, nEventCh); err != nil { + errorIf(err, "Error adding a listener!") + writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) + return + } // Remove listener channel after the writer has closed or the // client disconnected. defer globalEventNotifier.RemoveListenerChan(accountARN) diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 30249dbf6..94855c343 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -210,10 +210,6 @@ func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerC } else { en.internal.listenerConfigs[bucket] = lcfg } - // close all existing loggers and initialize again. - for _, v := range en.internal.targets { - v.lconn.Close() - } en.internal.targets = make(map[string]*listenerLogger) for _, lc := range lcfg { logger, err := newListenerLogger(lc.TopicConfig.TopicARN, diff --git a/cmd/notify-listener.go b/cmd/notify-listener.go index e94d16aa1..43474d6de 100644 --- a/cmd/notify-listener.go +++ b/cmd/notify-listener.go @@ -55,11 +55,6 @@ func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) return &listenerLogger{lcLog, lc}, nil } -func (lc listenerConn) Close() { - // ignore closing errors - _ = lc.Client.Close() -} - // send event to target server via rpc client calls. func (lc listenerConn) Fire(entry *logrus.Entry) error { notificationEvent, ok := entry.Data["Records"].([]NotificationEvent) diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index a9f0a1ece..48e6d8c66 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -142,10 +142,6 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface { SetToken(token string) SetTimestamp(tstamp time.Time) }) map[string]error { - // Take read lock for rpcClient map - s3p.mutex.RLock() - defer s3p.mutex.RUnlock() - // Result type type callResult struct { target string @@ -158,9 +154,9 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface { // Closure to make a single request. callTarget := func(target string) { reply := &GenericReply{} - client, ok := s3p.rpcClients[target] + client := s3p.GetPeerClient(target) var err error - if !ok { + if client == nil { err = fmt.Errorf("Requested client was not initialized - %v", target) } else {