diff --git a/cmd/globals.go b/cmd/globals.go index 8e039597f..613d359a3 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -79,13 +79,20 @@ var ( globalMinioPort = "9000" // Holds the host that was passed using --address globalMinioHost = "" + // Peer communication struct globalS3Peers = s3Peers{} // CA root certificates, a nil value means system certs pool will be used globalRootCAs *x509.CertPool + // List of admin peers. globalAdminPeers = adminPeers{} + + // Attempt to retry only this many number of times before + // giving up on the remote disk entirely. + globalMaxStorageRetryThreshold = 3 + // Add new variable global values here. ) diff --git a/cmd/retry-storage.go b/cmd/retry-storage.go index 7712bb31b..9bc8ab1a1 100644 --- a/cmd/retry-storage.go +++ b/cmd/retry-storage.go @@ -17,7 +17,7 @@ package cmd import ( - "net/rpc" + "time" "github.com/minio/minio/pkg/disk" ) @@ -48,7 +48,7 @@ func (f retryStorage) Close() (err error) { // DiskInfo - a retryable implementation of disk info. func (f retryStorage) DiskInfo() (info disk.Info, err error) { info, err = f.remoteStorage.DiskInfo() - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.DiskInfo() @@ -60,7 +60,7 @@ func (f retryStorage) DiskInfo() (info disk.Info, err error) { // MakeVol - a retryable implementation of creating a volume. func (f retryStorage) MakeVol(volume string) (err error) { err = f.remoteStorage.MakeVol(volume) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.MakeVol(volume) @@ -72,7 +72,7 @@ func (f retryStorage) MakeVol(volume string) (err error) { // ListVols - a retryable implementation of listing all the volumes. func (f retryStorage) ListVols() (vols []VolInfo, err error) { vols, err = f.remoteStorage.ListVols() - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.ListVols() @@ -84,7 +84,7 @@ func (f retryStorage) ListVols() (vols []VolInfo, err error) { // StatVol - a retryable implementation of stating a volume. func (f retryStorage) StatVol(volume string) (vol VolInfo, err error) { vol, err = f.remoteStorage.StatVol(volume) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.StatVol(volume) @@ -96,7 +96,7 @@ func (f retryStorage) StatVol(volume string) (vol VolInfo, err error) { // DeleteVol - a retryable implementation of deleting a volume. func (f retryStorage) DeleteVol(volume string) (err error) { err = f.remoteStorage.DeleteVol(volume) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.DeleteVol(volume) @@ -108,7 +108,7 @@ func (f retryStorage) DeleteVol(volume string) (err error) { // PrepareFile - a retryable implementation of preparing a file. func (f retryStorage) PrepareFile(volume, path string, length int64) (err error) { err = f.remoteStorage.PrepareFile(volume, path, length) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.PrepareFile(volume, path, length) @@ -120,7 +120,7 @@ func (f retryStorage) PrepareFile(volume, path string, length int64) (err error) // AppendFile - a retryable implementation of append to a file. func (f retryStorage) AppendFile(volume, path string, buffer []byte) (err error) { err = f.remoteStorage.AppendFile(volume, path, buffer) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.AppendFile(volume, path, buffer) @@ -132,7 +132,7 @@ func (f retryStorage) AppendFile(volume, path string, buffer []byte) (err error) // StatFile - a retryable implementation of stating a file. func (f retryStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { fileInfo, err = f.remoteStorage.StatFile(volume, path) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.StatFile(volume, path) @@ -144,7 +144,7 @@ func (f retryStorage) StatFile(volume, path string) (fileInfo FileInfo, err erro // ReadAll - a retryable implementation of reading all the content from a file. func (f retryStorage) ReadAll(volume, path string) (buf []byte, err error) { buf, err = f.remoteStorage.ReadAll(volume, path) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.ReadAll(volume, path) @@ -156,7 +156,7 @@ func (f retryStorage) ReadAll(volume, path string) (buf []byte, err error) { // ReadFile - a retryable implementation of reading at offset from a file. func (f retryStorage) ReadFile(volume, path string, offset int64, buffer []byte) (m int64, err error) { m, err = f.remoteStorage.ReadFile(volume, path, offset, buffer) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.ReadFile(volume, path, offset, buffer) @@ -168,7 +168,7 @@ func (f retryStorage) ReadFile(volume, path string, offset int64, buffer []byte) // ListDir - a retryable implementation of listing directory entries. func (f retryStorage) ListDir(volume, path string) (entries []string, err error) { entries, err = f.remoteStorage.ListDir(volume, path) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.ListDir(volume, path) @@ -180,7 +180,7 @@ func (f retryStorage) ListDir(volume, path string) (entries []string, err error) // DeleteFile - a retryable implementation of deleting a file. func (f retryStorage) DeleteFile(volume, path string) (err error) { err = f.remoteStorage.DeleteFile(volume, path) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.DeleteFile(volume, path) @@ -189,32 +189,10 @@ func (f retryStorage) DeleteFile(volume, path string) (err error) { return err } -// Connect and attempt to load the format from a disconnected node. -func (f retryStorage) reInit() (err error) { - err = f.remoteStorage.Close() - if err != nil { - return err - } - err = f.remoteStorage.Init() - if err == nil { - _, err = loadFormat(f.remoteStorage) - // For load format returning network shutdown - // we now treat it like disk not available. - if err == rpc.ErrShutdown { - err = errDiskNotFound - } - return err - } - if err == rpc.ErrShutdown { - err = errDiskNotFound - } - return err -} - // RenameFile - a retryable implementation of renaming a file. func (f retryStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { err = f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath) - if err == rpc.ErrShutdown { + if err == errDiskNotFound { err = f.reInit() if err == nil { return f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath) @@ -222,3 +200,39 @@ func (f retryStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) } return err } + +// Connect and attempt to load the format from a disconnected node, +// attempts three times before giving up. +func (f retryStorage) reInit() (err error) { + // Close the underlying connection. + f.remoteStorage.Close() // Error here is purposefully ignored. + + doneCh := make(chan struct{}) + defer close(doneCh) + for i := range newRetryTimer(time.Second, time.Second*30, MaxJitter, doneCh) { + // Initialize and make a new login attempt. + err = f.remoteStorage.Init() + if err != nil { + // No need to return error until the retry count + // threshold has reached. + if i < globalMaxStorageRetryThreshold { + continue + } + return err + } + // Attempt to load format to see if the disk is really + // a formatted disk and part of the cluster. + _, err = loadFormat(f.remoteStorage) + if err != nil { + // No need to return error until the retry count + // threshold has reached. + if i < globalMaxStorageRetryThreshold { + continue + } + return err + } + // Login and loading format was a success, break and proceed forward. + break + } + return err +} diff --git a/cmd/retry-storage_test.go b/cmd/retry-storage_test.go index 84d79135d..67315a8f2 100644 --- a/cmd/retry-storage_test.go +++ b/cmd/retry-storage_test.go @@ -18,7 +18,6 @@ package cmd import ( "bytes" - "net/rpc" "reflect" "testing" ) @@ -41,7 +40,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -54,14 +53,14 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } for _, disk := range storageDisks { err = disk.Init() - if err != rpc.ErrShutdown { - t.Fatal("Expected rpc.ErrShutdown, got", err) + if err != errDiskNotFound { + t.Fatal("Expected errDiskNotFound, got", err) } } @@ -79,7 +78,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -99,7 +98,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -116,7 +115,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -133,7 +132,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -153,7 +152,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -170,7 +169,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -187,7 +186,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -204,7 +203,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -225,7 +224,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -250,7 +249,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -270,7 +269,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -287,7 +286,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } @@ -308,7 +307,7 @@ func TestRetryStorage(t *testing.T) { t.Fatal("storage disk is not *retryStorage type") } storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ - 1: rpc.ErrShutdown, + 1: errDiskNotFound, }, nil)} } diff --git a/cmd/retry.go b/cmd/retry.go index b7df77d9a..364212af1 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -90,7 +90,7 @@ func newRetryTimer(unit time.Duration, cap time.Duration, jitter float64, doneCh go func() { defer close(attemptCh) - var nextBackoff int + nextBackoff := 1 for { select { // Attempts starts. diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 3f969d497..fadf66b0c 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -52,6 +52,10 @@ func toStorageErr(err error) error { return errDiskNotFound } + if err == rpc.ErrShutdown { + return errDiskNotFound + } + switch err.Error() { case io.EOF.Error(): return io.EOF @@ -143,19 +147,21 @@ const maxAllowedNetworkIOError = 1024 // Initializes the remote RPC connection by attempting a login attempt. func (n *networkStorage) Init() (err error) { // Attempt a login to reconnect. - return n.rpcClient.Login() + err = n.rpcClient.Login() + return toStorageErr(err) } // Closes the underlying RPC connection. func (n *networkStorage) Close() (err error) { // Close the underlying connection. - return n.rpcClient.Close() + err = n.rpcClient.Close() + return toStorageErr(err) } // DiskInfo - fetch disk information for a remote disk. func (n *networkStorage) DiskInfo() (info disk.Info, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -176,7 +182,7 @@ func (n *networkStorage) DiskInfo() (info disk.Info, err error) { // MakeVol - create a volume on a remote disk. func (n *networkStorage) MakeVol(volume string) (err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -198,7 +204,7 @@ func (n *networkStorage) MakeVol(volume string) (err error) { // ListVols - List all volumes on a remote disk. func (n *networkStorage) ListVols() (vols []VolInfo, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -220,7 +226,7 @@ func (n *networkStorage) ListVols() (vols []VolInfo, err error) { // StatVol - get volume info over the network. func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -241,7 +247,7 @@ func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - Deletes a volume over the network. func (n *networkStorage) DeleteVol(volume string) (err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -264,7 +270,7 @@ func (n *networkStorage) DeleteVol(volume string) (err error) { func (n *networkStorage) PrepareFile(volume, path string, length int64) (err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -288,7 +294,7 @@ func (n *networkStorage) PrepareFile(volume, path string, length int64) (err err // AppendFile - append file writes buffer to a remote network path. func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -313,7 +319,7 @@ func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err err // StatFile - get latest Stat information for a file at path. func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -339,7 +345,7 @@ func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err e // not use this on large files as it would cause server to crash. func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -362,7 +368,7 @@ func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) { // ReadFile - reads a file at remote path and fills the buffer. func (n *networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -398,7 +404,7 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff // ListDir - list all entries at prefix. func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -422,7 +428,7 @@ func (n *networkStorage) ListDir(volume, path string) (entries []string, err err // DeleteFile - Delete a file at path. func (n *networkStorage) DeleteFile(volume, path string) (err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() @@ -446,7 +452,7 @@ func (n *networkStorage) DeleteFile(volume, path string) (err error) { // RenameFile - rename a remote file from source to destination. func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { defer func() { - if err == errDiskNotFound || err == rpc.ErrShutdown { + if err == errDiskNotFound { atomic.AddInt32(&n.networkIOErrCount, 1) } }() diff --git a/cmd/storage-rpc-client_test.go b/cmd/storage-rpc-client_test.go index e414b1b18..56970ca79 100644 --- a/cmd/storage-rpc-client_test.go +++ b/cmd/storage-rpc-client_test.go @@ -52,7 +52,7 @@ func TestStorageErr(t *testing.T) { err: &net.OpError{}, }, { - expectedErr: rpc.ErrShutdown, + expectedErr: errDiskNotFound, err: rpc.ErrShutdown, }, { diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index c6c7a530f..39aeb9cc7 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -65,6 +65,9 @@ func init() { // Enable caching. setMaxMemory() + + // Tests don't need to retry. + globalMaxStorageRetryThreshold = 1 } func prepareFS() (ObjectLayer, string, error) { @@ -1945,6 +1948,12 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) { // ExecObjectLayerDiskAlteredTest - executes object layer tests while altering // disks in between tests. Creates XL ObjectLayer instance and runs test for XL layer. func ExecObjectLayerDiskAlteredTest(t *testing.T, objTest objTestDiskNotFoundType) { + configPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatal("Failed to create config directory", err) + } + defer removeAll(configPath) + objLayer, fsDirs, err := prepareXL() if err != nil { t.Fatalf("Initialization of object layer failed for XL setup: %s", err)