From c3bbadacbf73a0f3cefd8cca8fd5e172d31677b2 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 18 Oct 2016 21:26:58 -0700 Subject: [PATCH] Improve Peer RPC error handling (Fixes #2992) (#2995) * Check for RPC connection shutdown and try again just once. * Refactor SendRPC to use sync.WaitGroup --- cmd/notify-listener.go | 32 +++++++++++++++------ cmd/s3-peer-client.go | 65 +++++++++++++++++++++++------------------- 2 files changed, 59 insertions(+), 38 deletions(-) diff --git a/cmd/notify-listener.go b/cmd/notify-listener.go index 43474d6de..d0183fbe1 100644 --- a/cmd/notify-listener.go +++ b/cmd/notify-listener.go @@ -19,12 +19,13 @@ package cmd import ( "fmt" "io/ioutil" + "net/rpc" "github.com/Sirupsen/logrus" ) type listenerConn struct { - Client *AuthRPCClient + TargetAddr string ListenerARN string } @@ -34,13 +35,14 @@ type listenerLogger struct { } func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) { - client := globalS3Peers.GetPeerClient(targetAddr) - if client == nil { - return nil, fmt.Errorf("Peer %s was not initialized - bug!", - targetAddr) + if globalS3Peers.GetPeerClient(targetAddr) == nil { + return nil, fmt.Errorf( + "Peer %s was not initialized - bug!", + targetAddr, + ) } lc := listenerConn{ - Client: client, + TargetAddr: targetAddr, ListenerARN: listenerArn, } @@ -64,9 +66,23 @@ func (lc listenerConn) Fire(entry *logrus.Entry) error { return nil } - evArgs := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} + // Fetch peer client object + client := globalS3Peers.GetPeerClient(lc.TargetAddr) + if client == nil { + return fmt.Errorf("Target %s client RPC object not available!", lc.TargetAddr) + } + + // Send Event RPC call and return error + arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} reply := GenericReply{} - err := lc.Client.Call("S3.Event", &evArgs, &reply) + err := client.Call("S3.Event", &arg, &reply) + + // In case connection is shutdown, retry once. + if err != nil { + if err.Error() == rpc.ErrShutdown.Error() { + err = client.Call("S3.Event", &arg, &reply) + } + } return err } diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index 3ac412de6..51238c747 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -19,6 +19,7 @@ package cmd import ( "encoding/json" "fmt" + "net/rpc" "path" "sync" "time" @@ -143,44 +144,48 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface { SetToken(token string) SetTimestamp(tstamp time.Time) }) map[string]error { - // Result type - type callResult struct { - target string - err error - } - // Channel to collect results from goroutines - resChan := make(chan callResult) - - // Closure to make a single request. - callTarget := func(target string) { - reply := &GenericReply{} - client := s3p.GetPeerClient(target) - var err error - if client == nil { - err = fmt.Errorf("Requested client was not initialized - %v", - target) - } else { - err = client.Call(method, args, reply) - } - resChan <- callResult{target, err} + // peer error responses array + errArr := make([]error, len(peers)) + + // Start a wait group and make RPC requests to peers. + var wg sync.WaitGroup + for i, target := range peers { + wg.Add(1) + go func(ix int, target string) { + defer wg.Done() + reply := &GenericReply{} + // Get RPC client object safely. + client := s3p.GetPeerClient(target) + var err error + if client == nil { + err = fmt.Errorf("Requested client was not initialized - %v", + target) + } else { + err = client.Call(method, args, reply) + // Check for network errors and try + // again just once. + if err != nil { + if err.Error() == rpc.ErrShutdown.Error() { + err = client.Call(method, args, reply) + } + } + } + errArr[ix] = err + }(i, target) } + // Wait for requests to complete. + wg.Wait() + // Map of errors errsMap := make(map[string]error) - // make network calls in parallel - for _, target := range peers { - go callTarget(target) - } - // Wait on channel and collect all results - for range peers { - res := <-resChan - if res.err != nil { - errsMap[res.target] = res.err + for i, errVal := range errArr { + if errVal != nil { + errsMap[peers[i]] = errVal } } - // Return errors map return errsMap }