@ -17,16 +17,17 @@
package cmd
import (
"errors"
"net/rpc"
"sync"
)
// RPCClient is a wrapper type for rpc.Client which provides reconnect on first failure.
type RPCClient struct {
sync . Mutex
rpc * rpc . Client
node string
rpcPath string
mu sync . Mutex
rpcPrivate * rpc . Client
node string
rpcPath string
}
// newClient constructs a RPCClient object with node and rpcPath initialized.
@ -39,42 +40,80 @@ func newClient(node, rpcPath string) *RPCClient {
}
}
// Close closes the underlying socket file descriptor.
func ( rpcClient * RPCClient ) Close ( ) error {
rpcClient . Lock ( )
defer rpcClient . Unlock ( )
// If rpc client has not connected yet there is nothing to close.
if rpcClient . rpc == nil {
return nil
// clearRPCClient clears the pointer to the rpc.Client object in a safe manner
func ( rpcClient * RPCClient ) clearRPCClient ( ) {
rpcClient . mu . Lock ( )
rpcClient . rpcPrivate = nil
rpcClient . mu . Unlock ( )
}
// getRPCClient gets the pointer to the rpc.Client object in a safe manner
func ( rpcClient * RPCClient ) getRPCClient ( ) * rpc . Client {
rpcClient . mu . Lock ( )
rpcLocalStack := rpcClient . rpcPrivate
rpcClient . mu . Unlock ( )
return rpcLocalStack
}
// dialRPCClient tries to establish a connection to the server in a safe manner
func ( rpcClient * RPCClient ) dialRPCClient ( ) ( * rpc . Client , error ) {
rpcClient . mu . Lock ( )
defer rpcClient . mu . Unlock ( )
// After acquiring lock, check whether another thread may not have already dialed and established connection
if rpcClient . rpcPrivate != nil {
return rpcClient . rpcPrivate , nil
}
// Reset rpcClient.rpc to allow for subsequent calls to use a new
// (socket) connection.
clnt := rpcClient . rpc
rpcClient . rpc = nil
return clnt . Close ( )
rpc , err := rpc . DialHTTPPath ( "tcp" , rpcClient . node , rpcClient . rpcPath )
if err != nil {
return nil , err
} else if rpc == nil {
return nil , errors . New ( "No valid RPC Client created after dial" )
}
rpcClient . rpcPrivate = rpc
return rpcClient . rpcPrivate , nil
}
// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob.
func ( rpcClient * RPCClient ) Call ( serviceMethod string , args interface { } , reply interface { } ) error {
rpcClient . Lock ( )
defer rpcClient . Unlock ( )
// Make a copy below so that we can safely (continue to) work with the rpc.Client.
// Even in the case the two threads would simultaneously find that the connection is not initialised,
// they would both attempt to dial and only one of them would succeed in doing so.
rpcLocalStack := rpcClient . getRPCClient ( )
// If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint.
if rpcClient . rpc == nil {
clnt , err := rpc . DialHTTPPath ( "tcp" , rpcClient . node , rpcClient . rpcPath )
if rpcLocalStack == nil {
var err error
rpcLocalStack , err = rpcClient . dialRPCClient ( )
if err != nil {
return err
}
rpcClient . rpc = clnt
}
// If the RPC fails due to a network-related error, then we reset
// rpc.Client for a subsequent reconnect.
err := rpcClient . rpc . Call ( serviceMethod , args , reply )
err := rpcLocalStack . Call ( serviceMethod , args , reply )
if IsRPCError ( err ) {
rpcClient . rpc = nil
rpcClient . clearRPCClient ( )
}
return err
}
// Close closes the underlying socket file descriptor.
func ( rpcClient * RPCClient ) Close ( ) error {
// See comment above for making a copy on local stack
rpcLocalStack := rpcClient . getRPCClient ( )
// If rpc client has not connected yet there is nothing to close.
if rpcLocalStack == nil {
return nil
}
// Reset rpcClient.rpc to allow for subsequent calls to use a new
// (socket) connection.
rpcClient . clearRPCClient ( )
return rpcLocalStack . Close ( )
}
// IsRPCError returns true if the error value is due to a network related