diff --git a/cmd/routers.go b/cmd/routers.go index d2a9f00e3..17da5bb77 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -91,9 +91,6 @@ func newObjectLayerFactory(disks, ignoredDisks []string) func() ObjectLayer { // configureServer handler returns final handler for the http server. func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { - // Initialize Namespace locking. - initNSLock() - // Initialize storage rpc servers for every disk that is hosted on this node. storageRPCs, err := newRPCServer(srvCmdConfig) fatalIf(err, "Unable to initialize storage RPC server.") diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 149069f7d..8da757281 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -19,7 +19,6 @@ package main import ( "errors" "io" - "net/rpc" "path" "strconv" "strings" @@ -29,7 +28,7 @@ type networkStorage struct { netScheme string netAddr string netPath string - rpcClient *rpc.Client + rpcClient *RPCClient rpcToken string } @@ -88,7 +87,7 @@ func toStorageErr(err error) error { // Login rpc client makes an authentication request to the rpc server. // Receives a session token which will be used for subsequent requests. // FIXME: Currently these tokens expire in 100yrs. -func loginRPCClient(rpcClient *rpc.Client) (tokenStr string, err error) { +func loginRPCClient(rpcClient *RPCClient) (tokenStr string, err error) { cred := serverConfig.GetCredential() reply := RPCLoginReply{} if err = rpcClient.Call("Storage.LoginHandler", RPCLoginArgs{ @@ -118,13 +117,14 @@ func newRPCClient(networkPath string) (StorageAPI, error) { rpcPath := path.Join(storageRPCPath, netPath) port := getPort(srvConfig.serverAddr) rpcAddr := netAddr + ":" + strconv.Itoa(port) - rpcClient, err := rpc.DialHTTPPath("tcp", rpcAddr, rpcPath) - if err != nil { - return nil, err - } + // Initialize rpc client with network address and rpc path. + rpcClient := newClient(rpcAddr, rpcPath) token, err := loginRPCClient(rpcClient) if err != nil { + // Close the corresponding network connection w/ server to + // avoid leaking socket file descriptor. + rpcClient.Close() return nil, err } @@ -213,6 +213,9 @@ func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err erro // StatFile - get latest Stat information for a file at path. func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { + if n.rpcClient == nil { + return FileInfo{}, errVolumeBusy + } if err = n.rpcClient.Call("Storage.StatFileHandler", StatFileArgs{ Token: n.rpcToken, Vol: volume, @@ -228,6 +231,9 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er // This API is meant to be used on files which have small memory footprint, do // not use this on large files as it would cause server to crash. func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { + if n.rpcClient == nil { + return nil, errVolumeBusy + } if err = n.rpcClient.Call("Storage.ReadAllHandler", ReadAllArgs{ Token: n.rpcToken, Vol: volume, diff --git a/rpc-client.go b/rpc-client.go new file mode 100644 index 000000000..6a2e9cea5 --- /dev/null +++ b/rpc-client.go @@ -0,0 +1,89 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "net/rpc" + "sync" +) + +// RPCClient is a wrapper type for rpc.Client which provides reconnect on first failure. +type RPCClient struct { + sync.Mutex + rpc *rpc.Client + node string + rpcPath string +} + +// newClient constructs a RPCClient object with node and rpcPath initialized. +// It _doesn't_ connect to the remote endpoint. See Call method to see when the +// connect happens. +func newClient(node, rpcPath string) *RPCClient { + return &RPCClient{ + node: node, + rpcPath: rpcPath, + } +} + +// Close closes the underlying socket file descriptor. +func (rpcClient *RPCClient) Close() error { + rpcClient.Lock() + defer rpcClient.Unlock() + // Reset rpcClient.rpc to allow for subsequent calls to use a new + // (socket) connection. + clnt := rpcClient.rpc + rpcClient.rpc = nil + return clnt.Close() +} + +// Call makes a RPC call to the remote endpoint using the default codec, namely encoding/gob. +func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply interface{}) error { + rpcClient.Lock() + defer rpcClient.Unlock() + // If the rpc.Client is nil, we attempt to (re)connect with the remote endpoint. + if rpcClient.rpc == nil { + clnt, err := rpc.DialHTTPPath("tcp", rpcClient.node, rpcClient.rpcPath) + if err != nil { + return err + } + rpcClient.rpc = clnt + } + + // If the RPC fails due to a network-related error, then we reset + // rpc.Client for a subsequent reconnect. + err := rpcClient.rpc.Call(serviceMethod, args, reply) + if IsRPCError(err) { + rpcClient.rpc = nil + } + return err + +} + +// IsRPCError returns true if the error value is due to a network related +// failure, false otherwise. +func IsRPCError(err error) bool { + if err == nil { + return false + } + // The following are net/rpc specific errors that indicate that + // the connection may have been reset. Reset rpcClient.rpc to nil + // to trigger a reconnect in future. + if err == rpc.ErrShutdown { + return true + } + return false +}