rpc: Do not use read/write deadlines for rpc connections. (#4647)

Fixes #4626
master
Harshavardhana 8 years ago committed by Krishna Srinivas
parent c59b995f7b
commit f8bd9cfd83
  1. 3
      cmd/admin-rpc-server.go
  2. 4
      cmd/browser-rpc-router.go
  3. 3
      cmd/lock-rpc-server.go
  4. 58
      cmd/rpc-server.go
  5. 76
      cmd/rpc-server_test.go
  6. 4
      cmd/s3-peer-router.go
  7. 7
      cmd/storage-rpc-server.go
  8. 17
      pkg/http/bufconn.go
  9. 3
      pkg/http/bufconn_test.go

@ -21,7 +21,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/rpc"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -236,7 +235,7 @@ func (s *adminCmd) CommitConfig(cArgs *CommitConfigArgs, cReply *CommitConfigRep
// stop and restart commands. // stop and restart commands.
func registerAdminRPCRouter(mux *router.Router) error { func registerAdminRPCRouter(mux *router.Router) error {
adminRPCHandler := &adminCmd{} adminRPCHandler := &adminCmd{}
adminRPCServer := rpc.NewServer() adminRPCServer := newRPCServer()
err := adminRPCServer.RegisterName("Admin", adminRPCHandler) err := adminRPCServer.RegisterName("Admin", adminRPCHandler)
if err != nil { if err != nil {
return traceError(err) return traceError(err)

@ -17,8 +17,6 @@
package cmd package cmd
import ( import (
"net/rpc"
router "github.com/gorilla/mux" router "github.com/gorilla/mux"
) )
@ -39,7 +37,7 @@ type browserPeerAPIHandlers struct {
func registerBrowserPeerRPCRouter(mux *router.Router) error { func registerBrowserPeerRPCRouter(mux *router.Router) error {
bpHandlers := &browserPeerAPIHandlers{} bpHandlers := &browserPeerAPIHandlers{}
bpRPCServer := rpc.NewServer() bpRPCServer := newRPCServer()
err := bpRPCServer.RegisterName("BrowserPeer", bpHandlers) err := bpRPCServer.RegisterName("BrowserPeer", bpHandlers)
if err != nil { if err != nil {
return traceError(err) return traceError(err)

@ -19,7 +19,6 @@ package cmd
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"net/rpc"
"path" "path"
"sync" "sync"
"time" "time"
@ -99,7 +98,7 @@ func registerDistNSLockRouter(mux *router.Router, endpoints EndpointList) error
// registerStorageLockers - register locker rpc handlers for net/rpc library clients // registerStorageLockers - register locker rpc handlers for net/rpc library clients
func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error { func registerStorageLockers(mux *router.Router, lockServers []*lockServer) error {
for _, lockServer := range lockServers { for _, lockServer := range lockServers {
lockRPCServer := rpc.NewServer() lockRPCServer := newRPCServer()
if err := lockRPCServer.RegisterName(lockServiceName, lockServer); err != nil { if err := lockRPCServer.RegisterName(lockServiceName, lockServer); err != nil {
return traceError(err) return traceError(err)
} }

@ -0,0 +1,58 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io"
"net/http"
"net/rpc"
miniohttp "github.com/minio/minio/pkg/http"
)
// ServeHTTP implements an http.Handler that answers RPC requests,
// hijacks the underlying connection and clears all deadlines if any.
func (server *rpcServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodConnect {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
errorIf(err, "rpc hijacking failed for: %s", req.RemoteAddr)
return
}
// Overrides Read/Write deadlines if any.
bufConn, ok := conn.(*miniohttp.BufConn)
if ok {
bufConn.RemoveTimeout()
conn = bufConn
}
// Can connect to RPC service using HTTP CONNECT to rpcPath.
io.WriteString(conn, "HTTP/1.0 200 Connected to Go RPC\n\n")
server.ServeConn(conn)
}
type rpcServer struct{ *rpc.Server }
// Similar to rpc.NewServer() provides a custom ServeHTTP override.
func newRPCServer() *rpcServer {
return &rpcServer{rpc.NewServer()}
}

@ -0,0 +1,76 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/http"
"net/http/httptest"
"testing"
router "github.com/gorilla/mux"
)
type ArithArgs struct {
A, B int
}
type ArithReply struct {
C int
}
type Arith int
// Some of Arith's methods have value args, some have pointer args. That's deliberate.
func (t *Arith) Add(args ArithArgs, reply *ArithReply) error {
reply.C = args.A + args.B
return nil
}
func TestGoHTTPRPC(t *testing.T) {
newServer := newRPCServer()
newServer.Register(new(Arith))
mux := router.NewRouter().SkipClean(true)
mux.Path("/foo").Handler(newServer)
httpServer := httptest.NewServer(mux)
defer httpServer.Close()
client := newRPCClient(httpServer.Listener.Addr().String(), "/foo", false)
defer client.Close()
// Synchronous calls
args := &ArithArgs{7, 8}
reply := new(ArithReply)
if err := client.Call("Arith.Add", args, reply); err != nil {
t.Errorf("Add: expected no error but got string %v", err)
}
if reply.C != args.A+args.B {
t.Errorf("Add: expected %d got %d", reply.C, args.A+args.B)
}
resp, err := http.Get(httpServer.URL + "/foo")
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Errorf("Expected %d, got %d", http.StatusMethodNotAllowed, resp.StatusCode)
}
}

@ -17,8 +17,6 @@
package cmd package cmd
import ( import (
"net/rpc"
router "github.com/gorilla/mux" router "github.com/gorilla/mux"
) )
@ -39,7 +37,7 @@ func registerS3PeerRPCRouter(mux *router.Router) error {
}, },
} }
s3PeerRPCServer := rpc.NewServer() s3PeerRPCServer := newRPCServer()
err := s3PeerRPCServer.RegisterName("S3", s3PeerHandlers) err := s3PeerRPCServer.RegisterName("S3", s3PeerHandlers)
if err != nil { if err != nil {
return traceError(err) return traceError(err)

@ -18,7 +18,6 @@ package cmd
import ( import (
"io" "io"
"net/rpc"
"path" "path"
"time" "time"
@ -217,7 +216,7 @@ func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *AuthRPCRe
} }
// Initialize new storage rpc. // Initialize new storage rpc.
func newRPCServer(endpoints EndpointList) (servers []*storageServer, err error) { func newStorageRPCServer(endpoints EndpointList) (servers []*storageServer, err error) {
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
if endpoint.IsLocal { if endpoint.IsLocal {
storage, err := newPosix(endpoint.Path) storage, err := newPosix(endpoint.Path)
@ -238,14 +237,14 @@ func newRPCServer(endpoints EndpointList) (servers []*storageServer, err error)
// registerStorageRPCRouter - register storage rpc router. // registerStorageRPCRouter - register storage rpc router.
func registerStorageRPCRouters(mux *router.Router, endpoints EndpointList) error { func registerStorageRPCRouters(mux *router.Router, endpoints EndpointList) error {
// Initialize storage rpc servers for every disk that is hosted on this node. // Initialize storage rpc servers for every disk that is hosted on this node.
storageRPCs, err := newRPCServer(endpoints) storageRPCs, err := newStorageRPCServer(endpoints)
if err != nil { if err != nil {
return traceError(err) return traceError(err)
} }
// Create a unique route for each disk exported from this node. // Create a unique route for each disk exported from this node.
for _, stServer := range storageRPCs { for _, stServer := range storageRPCs {
storageRPCServer := rpc.NewServer() storageRPCServer := newRPCServer()
err = storageRPCServer.RegisterName("Storage", stServer) err = storageRPCServer.RegisterName("Storage", stServer)
if err != nil { if err != nil {
return traceError(err) return traceError(err)

@ -32,6 +32,7 @@ type BufConn struct {
updateBytesWrittenFunc func(int) // function to be called to update bytes written. updateBytesWrittenFunc func(int) // function to be called to update bytes written.
} }
// Sets read timeout
func (c *BufConn) setReadTimeout() { func (c *BufConn) setReadTimeout() {
if c.readTimeout != 0 { if c.readTimeout != 0 {
c.SetReadDeadline(time.Now().UTC().Add(c.readTimeout)) c.SetReadDeadline(time.Now().UTC().Add(c.readTimeout))
@ -44,6 +45,20 @@ func (c *BufConn) setWriteTimeout() {
} }
} }
// RemoveTimeout - removes all configured read and write
// timeouts. Used by callers which control net.Conn behavior
// themselves.
func (c *BufConn) RemoveTimeout() {
c.readTimeout = 0
c.writeTimeout = 0
// Unset read/write timeouts, since we use **bufio** it is not
// guaranteed that the underlying Peek/Read operation in-fact
// indeed performed a Read() operation on the network. With
// that in mind we need to unset any timeouts currently set to
// avoid any pre-mature timeouts.
c.SetDeadline(time.Time{})
}
// Peek - returns the next n bytes without advancing the reader. It just wraps bufio.Reader.Peek(). // Peek - returns the next n bytes without advancing the reader. It just wraps bufio.Reader.Peek().
func (c *BufConn) Peek(n int) ([]byte, error) { func (c *BufConn) Peek(n int) ([]byte, error) {
c.setReadTimeout() c.setReadTimeout()
@ -54,7 +69,6 @@ func (c *BufConn) Peek(n int) ([]byte, error) {
func (c *BufConn) Read(b []byte) (n int, err error) { func (c *BufConn) Read(b []byte) (n int, err error) {
c.setReadTimeout() c.setReadTimeout()
n, err = c.bufReader.Read(b) n, err = c.bufReader.Read(b)
if err == nil && c.updateBytesReadFunc != nil { if err == nil && c.updateBytesReadFunc != nil {
c.updateBytesReadFunc(n) c.updateBytesReadFunc(n)
} }
@ -66,7 +80,6 @@ func (c *BufConn) Read(b []byte) (n int, err error) {
func (c *BufConn) Write(b []byte) (n int, err error) { func (c *BufConn) Write(b []byte) (n int, err error) {
c.setWriteTimeout() c.setWriteTimeout()
n, err = c.Conn.Write(b) n, err = c.Conn.Write(b)
if err == nil && c.updateBytesWrittenFunc != nil { if err == nil && c.updateBytesWrittenFunc != nil {
c.updateBytesWrittenFunc(n) c.updateBytesWrittenFunc(n)
} }

@ -79,6 +79,9 @@ func TestBuffConnReadTimeout(t *testing.T) {
if terr != nil { if terr != nil {
t.Fatalf("failed to write to client. %v", terr) t.Fatalf("failed to write to client. %v", terr)
} }
// Removes all deadlines if any.
bufconn.RemoveTimeout()
}() }()
c, err := net.Dial("tcp", "localhost:"+port) c, err := net.Dial("tcp", "localhost:"+port)

Loading…
Cancel
Save