From d056f19d07848c329bf036fda4b736a95d7a0c8f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 29 Nov 2016 22:39:32 -0800 Subject: [PATCH] api: Allow reconnection of policy/notification rpc clients. (#3368) Since we moved out reconnection logic from net-rpc-client.go we should do it from the top-layer properly and bring back the code to reconnect properly in-case the connection is lost. --- cmd/bucket-metadata.go | 49 +++++++++++++++++++++++++++++++++++++----- cmd/s3-peer-client.go | 4 ++-- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 25dba4090..ab14d7e6a 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -16,7 +16,10 @@ package cmd -import "encoding/json" +import ( + "encoding/json" + "net/rpc" +) // BucketMetaState - Interface to update bucket metadata in-memory // state. @@ -104,26 +107,62 @@ type remoteBucketMetaState struct { // change to remote peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketNotification(args *SetBucketNotificationPeerArgs) error { reply := GenericReply{} - return rc.Call("S3.SetBucketNotificationPeer", args, &reply) + err := rc.Call("S3.SetBucketNotificationPeer", args, &reply) + // Check for network error and retry once. + if err != nil && err == rpc.ErrShutdown { + // Close the underlying connection to attempt once more. + rc.Close() + + // Attempt again and proceed. + err = rc.Call("S3.SetBucketNotificationPeer", args, &reply) + } + return err } // remoteBucketMetaState.UpdateBucketListener - sends bucket listener change to // remote peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketListener(args *SetBucketListenerPeerArgs) error { reply := GenericReply{} - return rc.Call("S3.SetBucketListenerPeer", args, &reply) + err := rc.Call("S3.SetBucketListenerPeer", args, &reply) + // Check for network error and retry once. + if err != nil && err == rpc.ErrShutdown { + // Close the underlying connection to attempt once more. + rc.Close() + + // Attempt again and proceed. + err = rc.Call("S3.SetBucketListenerPeer", args, &reply) + } + return err } // remoteBucketMetaState.UpdateBucketPolicy - sends bucket policy change to remote // peer via RPC call. func (rc *remoteBucketMetaState) UpdateBucketPolicy(args *SetBucketPolicyPeerArgs) error { reply := GenericReply{} - return rc.Call("S3.SetBucketPolicyPeer", args, &reply) + err := rc.Call("S3.SetBucketPolicyPeer", args, &reply) + // Check for network error and retry once. + if err != nil && err == rpc.ErrShutdown { + // Close the underlying connection to attempt once more. + rc.Close() + + // Attempt again and proceed. + err = rc.Call("S3.SetBucketPolicyPeer", args, &reply) + } + return err } // remoteBucketMetaState.SendEvent - sends event for bucket listener to remote // peer via RPC call. func (rc *remoteBucketMetaState) SendEvent(args *EventArgs) error { reply := GenericReply{} - return rc.Call("S3.Event", args, &reply) + err := rc.Call("S3.Event", args, &reply) + // Check for network error and retry once. + if err != nil && err == rpc.ErrShutdown { + // Close the underlying connection to attempt once more. + rc.Close() + + // Attempt again and proceed. + err = rc.Call("S3.Event", args, &reply) + } + return err } diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go index 0a218ec31..e93ce39d4 100644 --- a/cmd/s3-peer-client.go +++ b/cmd/s3-peer-client.go @@ -71,8 +71,8 @@ func makeS3Peers(eps []*url.URL) s3Peers { } ret = append(ret, s3Peer{ - ep.Host, - &remoteBucketMetaState{newAuthClient(&cfg)}, + addr: ep.Host, + bmsClient: &remoteBucketMetaState{newAuthClient(&cfg)}, }) seenAddr[ep.Host] = true }