Fix listen-bucket (Fixes #2942) (#2949)

Don't close socket while re-initializing notify-listeners, as the rpc
client object is shared between notify-listeners and peer clients.

Also, improves SendRPC() readability by using GetPeerClient().
master
Aditya Manthramurthy 8 years ago committed by Harshavardhana
parent 334cdb5d64
commit d02cb963d5
  1. 6
      cmd/bucket-notification-handlers.go
  2. 4
      cmd/event-notifier.go
  3. 5
      cmd/notify-listener.go
  4. 8
      cmd/s3-peer-client.go

@ -322,7 +322,11 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
nEventCh := make(chan []NotificationEvent)
defer close(nEventCh)
// Add channel for listener events
globalEventNotifier.AddListenerChan(accountARN, nEventCh)
if err = globalEventNotifier.AddListenerChan(accountARN, nEventCh); err != nil {
errorIf(err, "Error adding a listener!")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
// Remove listener channel after the writer has closed or the
// client disconnected.
defer globalEventNotifier.RemoveListenerChan(accountARN)

@ -210,10 +210,6 @@ func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerC
} else {
en.internal.listenerConfigs[bucket] = lcfg
}
// close all existing loggers and initialize again.
for _, v := range en.internal.targets {
v.lconn.Close()
}
en.internal.targets = make(map[string]*listenerLogger)
for _, lc := range lcfg {
logger, err := newListenerLogger(lc.TopicConfig.TopicARN,

@ -55,11 +55,6 @@ func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error)
return &listenerLogger{lcLog, lc}, nil
}
func (lc listenerConn) Close() {
// ignore closing errors
_ = lc.Client.Close()
}
// send event to target server via rpc client calls.
func (lc listenerConn) Fire(entry *logrus.Entry) error {
notificationEvent, ok := entry.Data["Records"].([]NotificationEvent)

@ -142,10 +142,6 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
SetToken(token string)
SetTimestamp(tstamp time.Time)
}) map[string]error {
// Take read lock for rpcClient map
s3p.mutex.RLock()
defer s3p.mutex.RUnlock()
// Result type
type callResult struct {
target string
@ -158,9 +154,9 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
// Closure to make a single request.
callTarget := func(target string) {
reply := &GenericReply{}
client, ok := s3p.rpcClients[target]
client := s3p.GetPeerClient(target)
var err error
if !ok {
if client == nil {
err = fmt.Errorf("Requested client was not initialized - %v",
target)
} else {

Loading…
Cancel
Save