storage/rpc: Remove network error restriction. (#3591)

This restriction has lots of side affects, since
we do not have a mechanism to clear states like
this it is better not to keep them.

Network errors are common and can occur with
simple cable removal etc. Since we already have
a retry mechanism this error count and stateful
nature can bring problems on a long running
cluster.
master
Harshavardhana 8 years ago committed by GitHub
parent 62f8343879
commit dfc2ef3004
  1. 165
      cmd/storage-rpc-client.go

@ -23,7 +23,6 @@ import (
"net/rpc" "net/rpc"
"net/url" "net/url"
"path" "path"
"sync/atomic"
"github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/disk"
) )
@ -136,15 +135,6 @@ func (n *networkStorage) String() string {
return n.rpcClient.ServerAddr() + ":" + n.rpcClient.ServiceEndpoint() return n.rpcClient.ServerAddr() + ":" + n.rpcClient.ServiceEndpoint()
} }
// Network IO error count is kept at 256 with some simple
// math. Before we reject the disk completely. The combination
// of retry logic and total error count roughly comes around
// 2.5secs ( 2 * 5 * time.Millisecond * 256) which is when we
// basically take the disk offline completely. This is considered
// sufficient time tradeoff to avoid large delays in-terms of
// incoming i/o.
const maxAllowedNetworkIOError = 256 // maximum allowed network IOError.
// Init - attempts a login to reconnect. // Init - attempts a login to reconnect.
func (n *networkStorage) Init() error { func (n *networkStorage) Init() error {
err := n.rpcClient.Login() err := n.rpcClient.Login()
@ -160,18 +150,6 @@ func (n *networkStorage) Close() (err error) {
// DiskInfo - fetch disk information for a remote disk. // DiskInfo - fetch disk information for a remote disk.
func (n *networkStorage) DiskInfo() (info disk.Info, err error) { func (n *networkStorage) DiskInfo() (info disk.Info, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return disk.Info{}, errFaultyRemoteDisk
}
args := AuthRPCArgs{} args := AuthRPCArgs{}
if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil { if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil {
return disk.Info{}, toStorageErr(err) return disk.Info{}, toStorageErr(err)
@ -181,18 +159,6 @@ func (n *networkStorage) DiskInfo() (info disk.Info, err error) {
// MakeVol - create a volume on a remote disk. // MakeVol - create a volume on a remote disk.
func (n *networkStorage) MakeVol(volume string) (err error) { func (n *networkStorage) MakeVol(volume string) (err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return errFaultyRemoteDisk
}
reply := AuthRPCReply{} reply := AuthRPCReply{}
args := GenericVolArgs{Vol: volume} args := GenericVolArgs{Vol: volume}
if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil {
@ -203,18 +169,6 @@ func (n *networkStorage) MakeVol(volume string) (err error) {
// ListVols - List all volumes on a remote disk. // ListVols - List all volumes on a remote disk.
func (n *networkStorage) ListVols() (vols []VolInfo, err error) { func (n *networkStorage) ListVols() (vols []VolInfo, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return nil, errFaultyRemoteDisk
}
ListVols := ListVolsReply{} ListVols := ListVolsReply{}
err = n.rpcClient.Call("Storage.ListVolsHandler", &AuthRPCArgs{}, &ListVols) err = n.rpcClient.Call("Storage.ListVolsHandler", &AuthRPCArgs{}, &ListVols)
if err != nil { if err != nil {
@ -225,18 +179,6 @@ func (n *networkStorage) ListVols() (vols []VolInfo, err error) {
// StatVol - get volume info over the network. // StatVol - get volume info over the network.
func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return VolInfo{}, errFaultyRemoteDisk
}
args := GenericVolArgs{Vol: volume} args := GenericVolArgs{Vol: volume}
if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil { if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil {
return VolInfo{}, toStorageErr(err) return VolInfo{}, toStorageErr(err)
@ -246,18 +188,6 @@ func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) {
// DeleteVol - Deletes a volume over the network. // DeleteVol - Deletes a volume over the network.
func (n *networkStorage) DeleteVol(volume string) (err error) { func (n *networkStorage) DeleteVol(volume string) (err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return errFaultyRemoteDisk
}
reply := AuthRPCReply{} reply := AuthRPCReply{}
args := GenericVolArgs{Vol: volume} args := GenericVolArgs{Vol: volume}
if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil {
@ -269,17 +199,6 @@ func (n *networkStorage) DeleteVol(volume string) (err error) {
// File operations. // File operations.
func (n *networkStorage) PrepareFile(volume, path string, length int64) (err error) { func (n *networkStorage) PrepareFile(volume, path string, length int64) (err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return errFaultyRemoteDisk
}
reply := AuthRPCReply{} reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{ if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{
Vol: volume, Vol: volume,
@ -293,18 +212,6 @@ func (n *networkStorage) PrepareFile(volume, path string, length int64) (err err
// AppendFile - append file writes buffer to a remote network path. // AppendFile - append file writes buffer to a remote network path.
func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return errFaultyRemoteDisk
}
reply := AuthRPCReply{} reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{
Vol: volume, Vol: volume,
@ -318,18 +225,6 @@ func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err err
// StatFile - get latest Stat information for a file at path. // StatFile - get latest Stat information for a file at path.
func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return FileInfo{}, errFaultyRemoteDisk
}
if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{ if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{
Vol: volume, Vol: volume,
Path: path, Path: path,
@ -344,18 +239,6 @@ func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err e
// This API is meant to be used on files which have small memory footprint, do // This API is meant to be used on files which have small memory footprint, do
// not use this on large files as it would cause server to crash. // not use this on large files as it would cause server to crash.
func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) { func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return nil, errFaultyRemoteDisk
}
if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{ if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{
Vol: volume, Vol: volume,
Path: path, Path: path,
@ -367,12 +250,6 @@ func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) {
// ReadFile - reads a file at remote path and fills the buffer. // ReadFile - reads a file at remote path and fills the buffer.
func (n *networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { func (n *networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
// Recover any panic from allocation, and return error. // Recover any panic from allocation, and return error.
@ -380,12 +257,6 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff
} }
}() // Do not crash the server. }() // Do not crash the server.
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return 0, errFaultyRemoteDisk
}
var result []byte var result []byte
err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{ err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{
Vol: volume, Vol: volume,
@ -403,18 +274,6 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff
// ListDir - list all entries at prefix. // ListDir - list all entries at prefix.
func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) { func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return nil, errFaultyRemoteDisk
}
if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{ if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{
Vol: volume, Vol: volume,
Path: path, Path: path,
@ -427,18 +286,6 @@ func (n *networkStorage) ListDir(volume, path string) (entries []string, err err
// DeleteFile - Delete a file at path. // DeleteFile - Delete a file at path.
func (n *networkStorage) DeleteFile(volume, path string) (err error) { func (n *networkStorage) DeleteFile(volume, path string) (err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return errFaultyRemoteDisk
}
reply := AuthRPCReply{} reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{
Vol: volume, Vol: volume,
@ -451,18 +298,6 @@ func (n *networkStorage) DeleteFile(volume, path string) (err error) {
// RenameFile - rename a remote file from source to destination. // RenameFile - rename a remote file from source to destination.
func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
defer func() {
if err == errDiskNotFound {
atomic.AddInt32(&n.networkIOErrCount, 1)
}
}()
// Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError {
return errFaultyRemoteDisk
}
reply := AuthRPCReply{} reply := AuthRPCReply{}
if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{
SrcVol: srcVolume, SrcVol: srcVolume,

Loading…
Cancel
Save