Improve Peer RPC error handling (Fixes #2992) (#2995)

* Check for RPC connection shutdown and try again just once.

* Refactor SendRPC to use sync.WaitGroup
master
Aditya Manthramurthy 8 years ago committed by Harshavardhana
parent 2208992e6a
commit c3bbadacbf
  1. 32
      cmd/notify-listener.go
  2. 65
      cmd/s3-peer-client.go

@ -19,12 +19,13 @@ package cmd
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/rpc"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
) )
type listenerConn struct { type listenerConn struct {
Client *AuthRPCClient TargetAddr string
ListenerARN string ListenerARN string
} }
@ -34,13 +35,14 @@ type listenerLogger struct {
} }
func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) { func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) {
client := globalS3Peers.GetPeerClient(targetAddr) if globalS3Peers.GetPeerClient(targetAddr) == nil {
if client == nil { return nil, fmt.Errorf(
return nil, fmt.Errorf("Peer %s was not initialized - bug!", "Peer %s was not initialized - bug!",
targetAddr) targetAddr,
)
} }
lc := listenerConn{ lc := listenerConn{
Client: client, TargetAddr: targetAddr,
ListenerARN: listenerArn, ListenerARN: listenerArn,
} }
@ -64,9 +66,23 @@ func (lc listenerConn) Fire(entry *logrus.Entry) error {
return nil return nil
} }
evArgs := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN} // Fetch peer client object
client := globalS3Peers.GetPeerClient(lc.TargetAddr)
if client == nil {
return fmt.Errorf("Target %s client RPC object not available!", lc.TargetAddr)
}
// Send Event RPC call and return error
arg := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN}
reply := GenericReply{} reply := GenericReply{}
err := lc.Client.Call("S3.Event", &evArgs, &reply) err := client.Call("S3.Event", &arg, &reply)
// In case connection is shutdown, retry once.
if err != nil {
if err.Error() == rpc.ErrShutdown.Error() {
err = client.Call("S3.Event", &arg, &reply)
}
}
return err return err
} }

@ -19,6 +19,7 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/rpc"
"path" "path"
"sync" "sync"
"time" "time"
@ -143,44 +144,48 @@ func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
SetToken(token string) SetToken(token string)
SetTimestamp(tstamp time.Time) SetTimestamp(tstamp time.Time)
}) map[string]error { }) map[string]error {
// Result type
type callResult struct {
target string
err error
}
// Channel to collect results from goroutines // peer error responses array
resChan := make(chan callResult) errArr := make([]error, len(peers))
// Closure to make a single request. // Start a wait group and make RPC requests to peers.
callTarget := func(target string) { var wg sync.WaitGroup
reply := &GenericReply{} for i, target := range peers {
client := s3p.GetPeerClient(target) wg.Add(1)
var err error go func(ix int, target string) {
if client == nil { defer wg.Done()
err = fmt.Errorf("Requested client was not initialized - %v", reply := &GenericReply{}
target) // Get RPC client object safely.
} else { client := s3p.GetPeerClient(target)
err = client.Call(method, args, reply) var err error
} if client == nil {
resChan <- callResult{target, err} err = fmt.Errorf("Requested client was not initialized - %v",
target)
} else {
err = client.Call(method, args, reply)
// Check for network errors and try
// again just once.
if err != nil {
if err.Error() == rpc.ErrShutdown.Error() {
err = client.Call(method, args, reply)
}
}
}
errArr[ix] = err
}(i, target)
} }
// Wait for requests to complete.
wg.Wait()
// Map of errors // Map of errors
errsMap := make(map[string]error) errsMap := make(map[string]error)
// make network calls in parallel for i, errVal := range errArr {
for _, target := range peers { if errVal != nil {
go callTarget(target) errsMap[peers[i]] = errVal
}
// Wait on channel and collect all results
for range peers {
res := <-resChan
if res.err != nil {
errsMap[res.target] = res.err
} }
} }
// Return errors map
return errsMap return errsMap
} }

Loading…
Cancel
Save