diff --git a/Makefile b/Makefile index 28134dff5..7c19fa231 100644 --- a/Makefile +++ b/Makefile @@ -92,8 +92,8 @@ ineffassign: cyclo: @echo "Running $@:" - @GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 65 cmd - @GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 65 pkg + @GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 100 cmd + @GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 100 pkg build: getdeps verifiers $(UI_ASSETS) diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 078d940b5..cb5cad7b4 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -97,13 +97,12 @@ type authConfig struct { // AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects. type AuthRPCClient struct { - mu sync.Mutex - config *authConfig - rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client - isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. - serverToken string // Disk rpc JWT based token. - serverVersion string // Server version exchanged by the RPC. - serverIOErrCnt int // Keeps track of total errors occurred for each RPC call. + mu sync.Mutex + config *authConfig + rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client + isLoggedIn bool // Indicates if the auth client has been logged in and token is valid. + serverToken string // Disk rpc JWT based token. + serverVersion string // Server version exchanged by the RPC. } // newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect. @@ -133,20 +132,6 @@ func (authClient *AuthRPCClient) Login() (err error) { // As soon as the function returns unlock, defer authClient.mu.Unlock() - // Take remote disk offline if the total server errors - // are more than maximum allowable IO error limit. - if authClient.serverIOErrCnt > maxAllowedIOError { - return errFaultyRemoteDisk - } - - // In defer sequence this is called first, so error - // increment happens well with in the lock. - defer func() { - if err != nil { - authClient.serverIOErrCnt++ - } - }() - // Return if already logged in. if authClient.isLoggedIn { return nil diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index a662e439e..c49d6b3ea 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -54,7 +54,7 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) { } fs := obj.(fsObjects) - fsstorage := fs.storage.(*posix) + fsstorage := fs.storage.(*retryStorage) listenARN := "arn:minio:sns:us-east-1:1:listen" queueARN := "arn:minio:sqs:us-east-1:1:redis" diff --git a/cmd/format-config-v1_test.go b/cmd/format-config-v1_test.go index 1876cf366..5bf1b0979 100644 --- a/cmd/format-config-v1_test.go +++ b/cmd/format-config-v1_test.go @@ -615,7 +615,7 @@ func TestInitFormatXLErrors(t *testing.T) { // All disks API return disk not found for i := 0; i < 16; i++ { - d := xl.storageDisks[i].(*posix) + d := xl.storageDisks[i].(*retryStorage) testStorageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound} } if err := initFormatXL(testStorageDisks); err != errDiskNotFound { @@ -624,7 +624,7 @@ func TestInitFormatXLErrors(t *testing.T) { // All disks returns disk not found in the fourth call for i := 0; i < 15; i++ { - d := xl.storageDisks[i].(*posix) + d := xl.storageDisks[i].(*retryStorage) testStorageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound, errors: map[int]error{0: nil, 1: nil, 2: nil}} } if err := initFormatXL(testStorageDisks); err != errDiskNotFound { @@ -720,9 +720,9 @@ func TestLoadFormatXLErrs(t *testing.T) { xl.storageDisks[11] = nil // disk 12 returns faulty disk - posixDisk, ok := xl.storageDisks[12].(*posix) + posixDisk, ok := xl.storageDisks[12].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[10] = newNaughtyDisk(posixDisk, nil, errFaultyDisk) if _, err = loadFormatXL(xl.storageDisks, 8); err != errFaultyDisk { @@ -749,9 +749,9 @@ func TestLoadFormatXLErrs(t *testing.T) { // disks 0..10 returns disk not found for i := 0; i <= 10; i++ { - posixDisk, ok := xl.storageDisks[i].(*posix) + posixDisk, ok := xl.storageDisks[i].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[i] = newNaughtyDisk(posixDisk, nil, errDiskNotFound) } @@ -881,9 +881,9 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) { t.Fatal(err) } xl = obj.(*xlObjects) - posixDisk, ok := xl.storageDisks[0].(*posix) + posixDisk, ok := xl.storageDisks[0].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errFaultyDisk) if err = healFormatXLCorruptedDisks(xl.storageDisks); err != errFaultyDisk { @@ -1036,9 +1036,9 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) { t.Fatal(err) } xl = obj.(*xlObjects) - posixDisk, ok := xl.storageDisks[0].(*posix) + posixDisk, ok := xl.storageDisks[0].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errFaultyDisk) if err = healFormatXLFreshDisks(xl.storageDisks); err != errFaultyDisk { diff --git a/cmd/fs-v1-metadata_test.go b/cmd/fs-v1-metadata_test.go index e7d71c91c..7c110c105 100644 --- a/cmd/fs-v1-metadata_test.go +++ b/cmd/fs-v1-metadata_test.go @@ -73,7 +73,7 @@ func TestReadFSMetadata(t *testing.T) { } // Test with corrupted disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk) fs.storage = naughty if _, err := readFSMetadata(fs.storage, ".minio.sys", fsPath); errorCause(err) != errFaultyDisk { @@ -111,7 +111,7 @@ func TestWriteFSMetadata(t *testing.T) { } // Reading metadata with a corrupted disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 2; i++ { naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk, i + 1: errFaultyDisk}, nil) fs.storage = naughty diff --git a/cmd/fs-v1-multipart-common_test.go b/cmd/fs-v1-multipart-common_test.go index 2d356cd4e..7b226be11 100644 --- a/cmd/fs-v1-multipart-common_test.go +++ b/cmd/fs-v1-multipart-common_test.go @@ -48,7 +48,7 @@ func TestFSIsBucketExist(t *testing.T) { } // Using a faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk) fs.storage = naughty if found := fs.isBucketExist(bucketName); found { @@ -92,7 +92,7 @@ func TestFSIsUploadExists(t *testing.T) { } // isUploadIdExists with a faulty disk should return false - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk) fs.storage = naughty if exists := fs.isUploadIDExists(bucketName, objectName, uploadID); exists { @@ -127,7 +127,7 @@ func TestFSWriteUploadJSON(t *testing.T) { } // isUploadIdExists with a faulty disk should return false - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 3; i++ { naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) fs.storage = naughty diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index 44de40196..29d9311e0 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -40,7 +40,7 @@ func TestNewMultipartUploadFaultyDisk(t *testing.T) { } // Test with faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 5; i++ { // Faulty disk generates errFaultyDisk at 'i' storage api call number fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) @@ -82,7 +82,7 @@ func TestPutObjectPartFaultyDisk(t *testing.T) { sha256sum := "" // Test with faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 7; i++ { // Faulty disk generates errFaultyDisk at 'i' storage api call number fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) @@ -138,7 +138,7 @@ func TestCompleteMultipartUploadFaultyDisk(t *testing.T) { parts := []completePart{{PartNumber: 1, ETag: md5Hex}} - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 3; i++ { // Faulty disk generates errFaultyDisk at 'i' storage api call number fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) @@ -186,7 +186,7 @@ func TestListMultipartUploadsFaultyDisk(t *testing.T) { t.Fatal("Unexpected error ", err) } - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 4; i++ { // Faulty disk generates errFaultyDisk at 'i' storage api call number fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index 850ff2037..e69e572c7 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -61,11 +61,11 @@ func TestNewFS(t *testing.T) { } // Initializes all disks with XL - err = waitForFormatDisks(true, endpoints, xlStorageDisks) + formattedDisks, err := waitForFormatDisks(true, endpoints, xlStorageDisks) if err != nil { t.Fatalf("Unable to format XL %s", err) } - _, err = newXLObjects(xlStorageDisks) + _, err = newXLObjects(formattedDisks) if err != nil { t.Fatalf("Unable to initialize XL object, %s", err) } @@ -79,7 +79,7 @@ func TestNewFS(t *testing.T) { } for _, testCase := range testCases { - if err = waitForFormatDisks(true, endpoints, []StorageAPI{testCase.disk}); err != testCase.expectedErr { + if _, err = waitForFormatDisks(true, endpoints, []StorageAPI{testCase.disk}); err != testCase.expectedErr { t.Errorf("expected: %s, got :%s", testCase.expectedErr, err) } } @@ -87,7 +87,7 @@ func TestNewFS(t *testing.T) { if err != errInvalidArgument { t.Errorf("Expecting error invalid argument, got %s", err) } - _, err = newFSObjects(xlStorageDisks[0]) + _, err = newFSObjects(&retryStorage{xlStorageDisks[0]}) if err != nil { errMsg := "Unable to recognize backend format, Disk is not in FS format." if err.Error() == errMsg { @@ -131,7 +131,7 @@ func TestFSShutdown(t *testing.T) { /* for i := 1; i <= 5; i++ { fs, disk := prepareTest() fs.DeleteObject(bucketName, objectName) - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) if err := fs.Shutdown(); errorCause(err) != errFaultyDisk { t.Fatal(i, ", Got unexpected fs shutdown error: ", err) @@ -161,7 +161,7 @@ func TestFSLoadFormatFS(t *testing.T) { t.Fatal("Should return an error here") } // Loading format file from faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk) _, err = loadFormatFS(fs.storage) if err != errFaultyDisk { @@ -197,7 +197,7 @@ func TestFSGetBucketInfo(t *testing.T) { } // Loading format file from faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk) _, err = fs.GetBucketInfo(bucketName) if errorCause(err) != errFaultyDisk { @@ -239,7 +239,7 @@ func TestFSDeleteObject(t *testing.T) { } // Loading format file from faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk) if err := fs.DeleteObject(bucketName, objectName); errorCause(err) != errFaultyDisk { t.Fatal("Unexpected error: ", err) @@ -278,7 +278,7 @@ func TestFSDeleteBucket(t *testing.T) { obj.MakeBucket(bucketName) // Loading format file from faulty disk - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 2; i++ { fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil) if err := fs.DeleteBucket(bucketName); errorCause(err) != errFaultyDisk { @@ -317,7 +317,7 @@ func TestFSListBuckets(t *testing.T) { } // Test ListBuckets with faulty disks - fsStorage := fs.storage.(*posix) + fsStorage := fs.storage.(*retryStorage) for i := 1; i <= 2; i++ { fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk) if _, err := fs.ListBuckets(); errorCause(err) != errFaultyDisk { diff --git a/cmd/logger-file-hook.go b/cmd/logger-file-hook.go index 5ddabde17..a0aad40a5 100644 --- a/cmd/logger-file-hook.go +++ b/cmd/logger-file-hook.go @@ -68,7 +68,7 @@ func (l *localFile) Fire(entry *logrus.Entry) error { if err != nil { return fmt.Errorf("Unable to read entry, %v", err) } - l.File.Write([]byte(line + "\n")) + l.File.Write([]byte(line)) l.File.Sync() return nil } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 10f81e195..8b7e62845 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -28,7 +28,7 @@ import ( // Programmed errors are stored in errors field. type naughtyDisk struct { // The real disk - disk *posix + disk *retryStorage // Programmed errors: API call number => error to return errors map[int]error // The error to return when no error value is programmed @@ -39,7 +39,7 @@ type naughtyDisk struct { mu sync.Mutex } -func newNaughtyDisk(d *posix, errs map[int]error, defaultErr error) *naughtyDisk { +func newNaughtyDisk(d *retryStorage, errs map[int]error, defaultErr error) *naughtyDisk { return &naughtyDisk{disk: d, errors: errs, defaultErr: defaultErr} } @@ -47,6 +47,20 @@ func (d *naughtyDisk) String() string { return d.disk.String() } +func (d *naughtyDisk) Init() (err error) { + if err = d.calcError(); err != nil { + return err + } + return d.disk.Init() +} + +func (d *naughtyDisk) Close() (err error) { + if err = d.calcError(); err != nil { + return err + } + return d.disk.Close() +} + func (d *naughtyDisk) calcError() (err error) { d.mu.Lock() defer d.mu.Unlock() diff --git a/cmd/net-rpc-client.go b/cmd/net-rpc-client.go index 7b8a68d0a..b74b531c2 100644 --- a/cmd/net-rpc-client.go +++ b/cmd/net-rpc-client.go @@ -155,32 +155,8 @@ func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply i } } - // If the RPC fails due to a network-related error, then we reset - // rpc.Client for a subsequent reconnect. - err := rpcLocalStack.Call(serviceMethod, args, reply) - if err != nil { - // Any errors other than rpc.ErrShutdown just return quickly. - if err != rpc.ErrShutdown { - return err - } // else rpc.ErrShutdown returned by rpc.Call - - // Reset the underlying rpc connection before - // moving to reconnect. - rpcClient.clearRPCClient() - - // Close the underlying connection before reconnect. - rpcLocalStack.Close() - - // Try once more to re-connect. - rpcLocalStack, err = rpcClient.dialRPCClient() - if err != nil { - return err - } - - // Attempt the rpc.Call once again, upon any error now just give up. - err = rpcLocalStack.Call(serviceMethod, args, reply) - } - return err + // If the RPC fails due to a network-related error + return rpcLocalStack.Call(serviceMethod, args, reply) } // Close closes the underlying socket file descriptor. diff --git a/cmd/posix.go b/cmd/posix.go index 725f3632f..9af59f5d0 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -193,6 +193,16 @@ func (s *posix) String() string { return s.diskPath } +// Init - this is a dummy call. +func (s *posix) Init() error { + return nil +} + +// Close - this is a dummy call. +func (s *posix) Close() error { + return nil +} + // DiskInfo provides current information about disk space usage, // total free inodes and underlying filesystem. func (s *posix) DiskInfo() (info disk.Info, err error) { diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 27e08bf49..155b3173a 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -175,6 +175,16 @@ func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions { return WaitForQuorum } +// Prints retry message upon a specific retry count. +func printRetryMsg(sErrs []error, storageDisks []StorageAPI) { + for i, sErr := range sErrs { + switch sErr { + case errDiskNotFound, errFaultyDisk, errFaultyRemoteDisk: + console.Printf("Disk %s is still unreachable, with error %s\n", storageDisks[i], sErr) + } + } +} + // Implements a jitter backoff loop for formatting all disks during // initialization of the server. func retryFormattingDisks(firstDisk bool, endpoints []*url.URL, storageDisks []StorageAPI) error { @@ -195,15 +205,13 @@ func retryFormattingDisks(firstDisk bool, endpoints []*url.URL, storageDisks []S retryTimerCh := newRetryTimer(time.Second, time.Second*30, MaxJitter, doneCh) for { select { - case retryCounter := <-retryTimerCh: - // Attempt to load all `format.json`. + case retryCount := <-retryTimerCh: + // Attempt to load all `format.json` from all disks. formatConfigs, sErrs := loadAllFormats(storageDisks) - if retryCounter > 5 { - for i, e := range sErrs { - if e == errDiskNotFound { - console.Printf("%s still unreachable.\n", storageDisks[i]) - } - } + if retryCount > 5 { + // After 5 retry attempts we start printing actual errors + // for disks not being available. + printRetryMsg(sErrs, storageDisks) } // Check if this is a XL or distributed XL, anything > 1 is considered XL backend. if len(formatConfigs) > 1 { @@ -258,18 +266,7 @@ func retryFormattingDisks(firstDisk bool, endpoints []*url.URL, storageDisks []S // Initialize storage disks based on input arguments. func initStorageDisks(endpoints []*url.URL) ([]StorageAPI, error) { - // Single disk means we will use FS backend. - if len(endpoints) == 1 { - if endpoints[0] == nil { - return nil, errInvalidArgument - } - storage, err := newStorageAPI(endpoints[0]) - if err != nil && err != errDiskNotFound { - return nil, err - } - return []StorageAPI{storage}, nil - } - // Otherwise proceed with XL setup. Bootstrap disks. + // Bootstrap disks. storageDisks := make([]StorageAPI, len(endpoints)) for index, ep := range endpoints { if ep == nil { @@ -287,18 +284,27 @@ func initStorageDisks(endpoints []*url.URL) ([]StorageAPI, error) { } // Format disks before initialization object layer. -func waitForFormatDisks(firstDisk bool, endpoints []*url.URL, storageDisks []StorageAPI) (err error) { +func waitForFormatDisks(firstDisk bool, endpoints []*url.URL, storageDisks []StorageAPI) (formattedDisks []StorageAPI, err error) { if len(endpoints) == 0 { - return errInvalidArgument + return nil, errInvalidArgument } firstEndpoint := endpoints[0] if firstEndpoint == nil { - return errInvalidArgument + return nil, errInvalidArgument } if storageDisks == nil { - return errInvalidArgument + return nil, errInvalidArgument } // Start retry loop retrying until disks are formatted properly, until we have reached // a conditional quorum of formatted disks. - return retryFormattingDisks(firstDisk, endpoints, storageDisks) + err = retryFormattingDisks(firstDisk, endpoints, storageDisks) + if err != nil { + return nil, err + } + // Initialize the disk into a formatted disks wrapper. + formattedDisks = make([]StorageAPI, len(storageDisks)) + for i, storage := range storageDisks { + formattedDisks[i] = &retryStorage{storage} + } + return formattedDisks, nil } diff --git a/cmd/retry-storage.go b/cmd/retry-storage.go new file mode 100644 index 000000000..7712bb31b --- /dev/null +++ b/cmd/retry-storage.go @@ -0,0 +1,224 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "net/rpc" + + "github.com/minio/minio/pkg/disk" +) + +// Retry storage is an instance of StorageAPI which +// additionally verifies upon network shutdown if the +// underlying storage is available and is really +// formatted. +type retryStorage struct { + remoteStorage StorageAPI +} + +// String representation of remoteStorage. +func (f retryStorage) String() string { + return f.remoteStorage.String() +} + +// Reconncts to underlying remote storage. +func (f retryStorage) Init() (err error) { + return f.remoteStorage.Init() +} + +// Closes the underlying remote storage connection. +func (f retryStorage) Close() (err error) { + return f.remoteStorage.Close() +} + +// DiskInfo - a retryable implementation of disk info. +func (f retryStorage) DiskInfo() (info disk.Info, err error) { + info, err = f.remoteStorage.DiskInfo() + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.DiskInfo() + } + } + return info, err +} + +// MakeVol - a retryable implementation of creating a volume. +func (f retryStorage) MakeVol(volume string) (err error) { + err = f.remoteStorage.MakeVol(volume) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.MakeVol(volume) + } + } + return err +} + +// ListVols - a retryable implementation of listing all the volumes. +func (f retryStorage) ListVols() (vols []VolInfo, err error) { + vols, err = f.remoteStorage.ListVols() + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.ListVols() + } + } + return vols, err +} + +// StatVol - a retryable implementation of stating a volume. +func (f retryStorage) StatVol(volume string) (vol VolInfo, err error) { + vol, err = f.remoteStorage.StatVol(volume) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.StatVol(volume) + } + } + return vol, err +} + +// DeleteVol - a retryable implementation of deleting a volume. +func (f retryStorage) DeleteVol(volume string) (err error) { + err = f.remoteStorage.DeleteVol(volume) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.DeleteVol(volume) + } + } + return err +} + +// PrepareFile - a retryable implementation of preparing a file. +func (f retryStorage) PrepareFile(volume, path string, length int64) (err error) { + err = f.remoteStorage.PrepareFile(volume, path, length) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.PrepareFile(volume, path, length) + } + } + return err +} + +// AppendFile - a retryable implementation of append to a file. +func (f retryStorage) AppendFile(volume, path string, buffer []byte) (err error) { + err = f.remoteStorage.AppendFile(volume, path, buffer) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.AppendFile(volume, path, buffer) + } + } + return err +} + +// StatFile - a retryable implementation of stating a file. +func (f retryStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { + fileInfo, err = f.remoteStorage.StatFile(volume, path) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.StatFile(volume, path) + } + } + return fileInfo, err +} + +// ReadAll - a retryable implementation of reading all the content from a file. +func (f retryStorage) ReadAll(volume, path string) (buf []byte, err error) { + buf, err = f.remoteStorage.ReadAll(volume, path) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.ReadAll(volume, path) + } + } + return buf, err +} + +// ReadFile - a retryable implementation of reading at offset from a file. +func (f retryStorage) ReadFile(volume, path string, offset int64, buffer []byte) (m int64, err error) { + m, err = f.remoteStorage.ReadFile(volume, path, offset, buffer) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.ReadFile(volume, path, offset, buffer) + } + } + return m, err +} + +// ListDir - a retryable implementation of listing directory entries. +func (f retryStorage) ListDir(volume, path string) (entries []string, err error) { + entries, err = f.remoteStorage.ListDir(volume, path) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.ListDir(volume, path) + } + } + return entries, err +} + +// DeleteFile - a retryable implementation of deleting a file. +func (f retryStorage) DeleteFile(volume, path string) (err error) { + err = f.remoteStorage.DeleteFile(volume, path) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.DeleteFile(volume, path) + } + } + return err +} + +// Connect and attempt to load the format from a disconnected node. +func (f retryStorage) reInit() (err error) { + err = f.remoteStorage.Close() + if err != nil { + return err + } + err = f.remoteStorage.Init() + if err == nil { + _, err = loadFormat(f.remoteStorage) + // For load format returning network shutdown + // we now treat it like disk not available. + if err == rpc.ErrShutdown { + err = errDiskNotFound + } + return err + } + if err == rpc.ErrShutdown { + err = errDiskNotFound + } + return err +} + +// RenameFile - a retryable implementation of renaming a file. +func (f retryStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { + err = f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath) + if err == rpc.ErrShutdown { + err = f.reInit() + if err == nil { + return f.remoteStorage.RenameFile(srcVolume, srcPath, dstVolume, dstPath) + } + } + return err +} diff --git a/cmd/retry-storage_test.go b/cmd/retry-storage_test.go new file mode 100644 index 000000000..84d79135d --- /dev/null +++ b/cmd/retry-storage_test.go @@ -0,0 +1,323 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "bytes" + "net/rpc" + "reflect" + "testing" +) + +// Tests retry storage. +func TestRetryStorage(t *testing.T) { + root, err := newTestConfig("us-east-1") + if err != nil { + t.Fatal(err) + } + defer removeAll(root) + + originalStorageDisks, disks := prepareXLStorageDisks(t) + defer removeRoots(disks) + + var storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + // Validate all the conditions for retrying calls. + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + err = disk.Init() + if err != rpc.ErrShutdown { + t.Fatal("Expected rpc.ErrShutdown, got", err) + } + } + + for _, disk := range storageDisks { + _, err = disk.DiskInfo() + if err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.MakeVol("existent"); err != nil { + t.Fatal(err) + } + if _, err = disk.StatVol("existent"); err == errVolumeNotFound { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if _, err = disk.StatVol("existent"); err == errVolumeNotFound { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if _, err = disk.ListVols(); err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.DeleteVol("existent"); err != nil { + t.Fatal(err) + } + if str := disk.String(); str == "" { + t.Fatal("String method for disk cannot be empty.") + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.MakeVol("existent"); err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.PrepareFile("existent", "path", 10); err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.AppendFile("existent", "path", []byte("Hello, World")); err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + var buf1 []byte + if buf1, err = disk.ReadAll("existent", "path"); err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf1, []byte("Hello, World")) { + t.Fatalf("Expected `Hello, World`, got %s", string(buf1)) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + var buf2 = make([]byte, 5) + var n int64 + if n, err = disk.ReadFile("existent", "path", 7, buf2); err != nil { + t.Fatal(err) + } + if n != 5 { + t.Fatalf("Expected 5, got %d", n) + } + if !bytes.Equal(buf2, []byte("World")) { + t.Fatalf("Expected `World`, got %s", string(buf2)) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.RenameFile("existent", "path", "existent", "new-path"); err != nil { + t.Fatal(err) + } + if _, err = disk.StatFile("existent", "new-path"); err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if _, err = disk.StatFile("existent", "new-path"); err != nil { + t.Fatal(err) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + var entries []string + if entries, err = disk.ListDir("existent", ""); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(entries, []string{"new-path"}) { + t.Fatalf("Expected []string{\"new-path\"}, got %s", entries) + } + } + + storageDisks = make([]StorageAPI, len(originalStorageDisks)) + for i := range originalStorageDisks { + retryDisk, ok := originalStorageDisks[i].(*retryStorage) + if !ok { + t.Fatal("storage disk is not *retryStorage type") + } + storageDisks[i] = &retryStorage{newNaughtyDisk(retryDisk, map[int]error{ + 1: rpc.ErrShutdown, + }, nil)} + } + + for _, disk := range storageDisks { + if err = disk.DeleteFile("existent", "new-path"); err != nil { + t.Fatal(err) + } + if err = disk.DeleteVol("existent"); err != nil { + t.Fatal(err) + } + } +} diff --git a/cmd/server-main.go b/cmd/server-main.go index 8862209e7..58e671ffc 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -363,6 +363,9 @@ func serverMain(c *cli.Context) { cli.ShowCommandHelpAndExit(c, "server", 1) } + // Set global quiet flag. + globalQuiet = c.Bool("quiet") || c.GlobalBool("quiet") + // Server address. serverAddr := c.String("address") @@ -391,7 +394,7 @@ func serverMain(c *cli.Context) { fatalIf(err, "Unable to parse storage endpoints %s", c.Args()) storageDisks, err := initStorageDisks(endpoints) - fatalIf(err, "Unable to initialize storage disks.") + fatalIf(err, "Unable to initialize storage disk(s).") // Cleanup objects that weren't successfully written into the namespace. fatalIf(houseKeeping(storageDisks), "Unable to purge temporary files.") @@ -451,11 +454,11 @@ func serverMain(c *cli.Context) { }(tls) // Wait for formatting of disks. - err = waitForFormatDisks(firstDisk, endpoints, storageDisks) + formattedDisks, err := waitForFormatDisks(firstDisk, endpoints, storageDisks) fatalIf(err, "formatting storage disks failed") // Once formatted, initialize object layer. - newObject, err := newObjectLayer(storageDisks) + newObject, err := newObjectLayer(formattedDisks) fatalIf(err, "intializing object layer failed") globalObjLayerMutex.Lock() diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 16eb7ec8a..d9996bbc4 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -24,6 +24,8 @@ type StorageAPI interface { String() string // Storage operations. + Init() (err error) + Close() (err error) DiskInfo() (info disk.Info, err error) // Volume operations. diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 2fe50116d..e90bc903d 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -22,14 +22,16 @@ import ( "net/rpc" "net/url" "path" + "sync/atomic" "github.com/minio/minio/pkg/disk" ) type networkStorage struct { - netAddr string - netPath string - rpcClient *AuthRPCClient + networkIOErrCount int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG + netAddr string + netPath string + rpcClient *AuthRPCClient } const ( @@ -54,8 +56,6 @@ func toStorageErr(err error) error { return io.EOF case io.ErrUnexpectedEOF.Error(): return io.ErrUnexpectedEOF - case rpc.ErrShutdown.Error(): - return errDiskNotFound case errUnexpected.Error(): return errUnexpected case errDiskFull.Error(): @@ -132,12 +132,39 @@ func newStorageRPC(ep *url.URL) (StorageAPI, error) { } // Stringer interface compatible representation of network device. -func (n networkStorage) String() string { +func (n *networkStorage) String() string { return n.netAddr + ":" + n.netPath } +// maximum allowed network IOError. +const maxAllowedNetworkIOError = 1024 + +// Initializes the remote RPC connection by attempting a login attempt. +func (n *networkStorage) Init() (err error) { + // Attempt a login to reconnect. + return n.rpcClient.Login() +} + +// Closes the underlying RPC connection. +func (n *networkStorage) Close() (err error) { + // Close the underlying connection. + return n.rpcClient.Close() +} + // DiskInfo - fetch disk information for a remote disk. -func (n networkStorage) DiskInfo() (info disk.Info, err error) { +func (n *networkStorage) DiskInfo() (info disk.Info, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return disk.Info{}, errFaultyRemoteDisk + } + args := GenericArgs{} if err = n.rpcClient.Call("Storage.DiskInfoHandler", &args, &info); err != nil { return disk.Info{}, toStorageErr(err) @@ -146,7 +173,19 @@ func (n networkStorage) DiskInfo() (info disk.Info, err error) { } // MakeVol - create a volume on a remote disk. -func (n networkStorage) MakeVol(volume string) error { +func (n *networkStorage) MakeVol(volume string) (err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return errFaultyRemoteDisk + } + reply := GenericReply{} args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.MakeVolHandler", &args, &reply); err != nil { @@ -156,7 +195,19 @@ func (n networkStorage) MakeVol(volume string) error { } // ListVols - List all volumes on a remote disk. -func (n networkStorage) ListVols() (vols []VolInfo, err error) { +func (n *networkStorage) ListVols() (vols []VolInfo, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return nil, errFaultyRemoteDisk + } + ListVols := ListVolsReply{} err = n.rpcClient.Call("Storage.ListVolsHandler", &GenericArgs{}, &ListVols) if err != nil { @@ -165,8 +216,20 @@ func (n networkStorage) ListVols() (vols []VolInfo, err error) { return ListVols.Vols, nil } -// StatVol - get current Stat volume info. -func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { +// StatVol - get volume info over the network. +func (n *networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return VolInfo{}, errFaultyRemoteDisk + } + args := GenericVolArgs{Vol: volume} if err = n.rpcClient.Call("Storage.StatVolHandler", &args, &volInfo); err != nil { return VolInfo{}, toStorageErr(err) @@ -174,8 +237,20 @@ func (n networkStorage) StatVol(volume string) (volInfo VolInfo, err error) { return volInfo, nil } -// DeleteVol - Delete a volume. -func (n networkStorage) DeleteVol(volume string) error { +// DeleteVol - Deletes a volume over the network. +func (n *networkStorage) DeleteVol(volume string) (err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return errFaultyRemoteDisk + } + reply := GenericReply{} args := GenericVolArgs{Vol: volume} if err := n.rpcClient.Call("Storage.DeleteVolHandler", &args, &reply); err != nil { @@ -186,7 +261,18 @@ func (n networkStorage) DeleteVol(volume string) error { // File operations. -func (n networkStorage) PrepareFile(volume, path string, length int64) (err error) { +func (n *networkStorage) PrepareFile(volume, path string, length int64) (err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return errFaultyRemoteDisk + } + reply := GenericReply{} if err = n.rpcClient.Call("Storage.PrepareFileHandler", &PrepareFileArgs{ Vol: volume, @@ -198,8 +284,20 @@ func (n networkStorage) PrepareFile(volume, path string, length int64) (err erro return nil } -// CreateFile - create file. -func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { +// AppendFile - append file writes buffer to a remote network path. +func (n *networkStorage) AppendFile(volume, path string, buffer []byte) (err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return errFaultyRemoteDisk + } + reply := GenericReply{} if err = n.rpcClient.Call("Storage.AppendFileHandler", &AppendFileArgs{ Vol: volume, @@ -212,7 +310,19 @@ func (n networkStorage) AppendFile(volume, path string, buffer []byte) (err erro } // StatFile - get latest Stat information for a file at path. -func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { +func (n *networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return FileInfo{}, errFaultyRemoteDisk + } + if err = n.rpcClient.Call("Storage.StatFileHandler", &StatFileArgs{ Vol: volume, Path: path, @@ -226,7 +336,19 @@ func (n networkStorage) StatFile(volume, path string) (fileInfo FileInfo, err er // contents in a byte slice. Returns buf == nil if err != nil. // This API is meant to be used on files which have small memory footprint, do // not use this on large files as it would cause server to crash. -func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { +func (n *networkStorage) ReadAll(volume, path string) (buf []byte, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return nil, errFaultyRemoteDisk + } + if err = n.rpcClient.Call("Storage.ReadAllHandler", &ReadAllArgs{ Vol: volume, Path: path, @@ -236,8 +358,20 @@ func (n networkStorage) ReadAll(volume, path string) (buf []byte, err error) { return buf, nil } -// ReadFile - reads a file. -func (n networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { +// ReadFile - reads a file at remote path and fills the buffer. +func (n *networkStorage) ReadFile(volume string, path string, offset int64, buffer []byte) (m int64, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return 0, errFaultyRemoteDisk + } + var result []byte err = n.rpcClient.Call("Storage.ReadFileHandler", &ReadFileArgs{ Vol: volume, @@ -252,7 +386,19 @@ func (n networkStorage) ReadFile(volume string, path string, offset int64, buffe } // ListDir - list all entries at prefix. -func (n networkStorage) ListDir(volume, path string) (entries []string, err error) { +func (n *networkStorage) ListDir(volume, path string) (entries []string, err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return nil, errFaultyRemoteDisk + } + if err = n.rpcClient.Call("Storage.ListDirHandler", &ListDirArgs{ Vol: volume, Path: path, @@ -264,7 +410,19 @@ func (n networkStorage) ListDir(volume, path string) (entries []string, err erro } // DeleteFile - Delete a file at path. -func (n networkStorage) DeleteFile(volume, path string) (err error) { +func (n *networkStorage) DeleteFile(volume, path string) (err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return errFaultyRemoteDisk + } + reply := GenericReply{} if err = n.rpcClient.Call("Storage.DeleteFileHandler", &DeleteFileArgs{ Vol: volume, @@ -275,8 +433,20 @@ func (n networkStorage) DeleteFile(volume, path string) (err error) { return nil } -// RenameFile - Rename file. -func (n networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { +// RenameFile - rename a remote file from source to destination. +func (n *networkStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) { + defer func() { + if err == errDiskNotFound || err == rpc.ErrShutdown { + atomic.AddInt32(&n.networkIOErrCount, 1) + } + }() + + // Take remote disk offline if the total network errors. + // are more than maximum allowable IO error limit. + if n.networkIOErrCount > maxAllowedNetworkIOError { + return errFaultyRemoteDisk + } + reply := GenericReply{} if err = n.rpcClient.Call("Storage.RenameFileHandler", &RenameFileArgs{ SrcVol: srcVolume, diff --git a/cmd/storage-rpc-client_test.go b/cmd/storage-rpc-client_test.go index 56970ca79..e414b1b18 100644 --- a/cmd/storage-rpc-client_test.go +++ b/cmd/storage-rpc-client_test.go @@ -52,7 +52,7 @@ func TestStorageErr(t *testing.T) { err: &net.OpError{}, }, { - expectedErr: errDiskNotFound, + expectedErr: rpc.ErrShutdown, err: rpc.ErrShutdown, }, { diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index b0b8964cb..e888c66cc 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -1561,12 +1561,12 @@ func initObjectLayer(endpoints []*url.URL) (ObjectLayer, []StorageAPI, error) { return nil, nil, err } - err = waitForFormatDisks(true, endpoints, storageDisks) + formattedDisks, err := waitForFormatDisks(true, endpoints, storageDisks) if err != nil { return nil, nil, err } - objLayer, err := newObjectLayer(storageDisks) + objLayer, err := newObjectLayer(formattedDisks) if err != nil { return nil, nil, err } @@ -1578,7 +1578,7 @@ func initObjectLayer(endpoints []*url.URL) (ObjectLayer, []StorageAPI, error) { } // Success. - return objLayer, storageDisks, nil + return objLayer, formattedDisks, nil } // removeRoots - Cleans up initialized directories during tests. @@ -1614,8 +1614,7 @@ func prepareNErroredDisks(storageDisks []StorageAPI, offline int, err error, t * } for i := 0; i < offline; i++ { - d := storageDisks[i].(*posix) - storageDisks[i] = &naughtyDisk{disk: d, defaultErr: err} + storageDisks[i] = &naughtyDisk{disk: &retryStorage{storageDisks[i]}, defaultErr: err} } return storageDisks } diff --git a/cmd/update-main.go b/cmd/update-main.go index f88d24edd..b96aa825c 100644 --- a/cmd/update-main.go +++ b/cmd/update-main.go @@ -31,22 +31,12 @@ import ( "github.com/minio/mc/pkg/console" ) -// command specific flags. -var ( - updateFlags = []cli.Flag{ - cli.BoolFlag{ - Name: "experimental, E", - Usage: "Check experimental update.", - }, - } -) - // Check for new software updates. var updateCmd = cli.Command{ Name: "update", Usage: "Check for a new software update.", Action: mainUpdate, - Flags: append(updateFlags, globalFlags...), + Flags: globalFlags, CustomHelpTemplate: `Name: minio {{.Name}} - {{.Usage}} @@ -59,16 +49,12 @@ FLAGS: EXAMPLES: 1. Check for any new official release. $ minio {{.Name}} - - 2. Check for any new experimental release. - $ minio {{.Name}} --experimental `, } // update URL endpoints. const ( - minioUpdateStableURL = "https://dl.minio.io/server/minio/release" - minioUpdateExperimentalURL = "https://dl.minio.io/server/minio/experimental" + minioUpdateStableURL = "https://dl.minio.io/server/minio/release" ) // updateMessage container to hold update messages. @@ -279,16 +265,17 @@ func getReleaseUpdate(updateURL string, duration time.Duration) (updateMsg updat // main entry point for update command. func mainUpdate(ctx *cli.Context) { + // Set global quiet flag. + if ctx.Bool("quiet") || ctx.GlobalBool("quiet") { + return + } + // Check for update. var updateMsg updateMessage var errMsg string var err error var secs = time.Second * 3 - if ctx.Bool("experimental") { - updateMsg, errMsg, err = getReleaseUpdate(minioUpdateExperimentalURL, secs) - } else { - updateMsg, errMsg, err = getReleaseUpdate(minioUpdateStableURL, secs) - } + updateMsg, errMsg, err = getReleaseUpdate(minioUpdateStableURL, secs) fatalIf(err, errMsg) console.Println(updateMsg) } diff --git a/cmd/web-handlers_test.go b/cmd/web-handlers_test.go index 2f3b50911..8a937c4a8 100644 --- a/cmd/web-handlers_test.go +++ b/cmd/web-handlers_test.go @@ -1364,7 +1364,7 @@ func TestWebObjectLayerFaultyDisks(t *testing.T) { // Set faulty disks to XL backend xl := obj.(*xlObjects) for i, d := range xl.storageDisks { - xl.storageDisks[i] = newNaughtyDisk(d.(*posix), nil, errFaultyDisk) + xl.storageDisks[i] = newNaughtyDisk(d.(*retryStorage), nil, errFaultyDisk) } // Initialize web rpc endpoint. diff --git a/cmd/xl-v1-healing_test.go b/cmd/xl-v1-healing_test.go index ed3bd5f69..d770f96be 100644 --- a/cmd/xl-v1-healing_test.go +++ b/cmd/xl-v1-healing_test.go @@ -94,9 +94,9 @@ func TestHealFormatXL(t *testing.T) { } xl = obj.(*xlObjects) for i := range xl.storageDisks { - posixDisk, ok := xl.storageDisks[i].(*posix) + posixDisk, ok := xl.storageDisks[i].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[i] = newNaughtyDisk(posixDisk, nil, errDiskFull) } @@ -226,9 +226,9 @@ func TestHealFormatXL(t *testing.T) { t.Fatal(err) } } - posixDisk, ok := xl.storageDisks[3].(*posix) + posixDisk, ok := xl.storageDisks[3].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[3] = newNaughtyDisk(posixDisk, nil, errDiskNotFound) expectedErr := fmt.Errorf("Unable to initialize format %s and %s", errSomeDiskOffline, errSomeDiskUnformatted) @@ -365,9 +365,9 @@ func TestQuickHeal(t *testing.T) { } // Corrupt one of the disks to return unformatted disk. - posixDisk, ok := xl.storageDisks[0].(*posix) + posixDisk, ok := xl.storageDisks[0].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errUnformattedDisk) if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != errUnformattedDisk { @@ -414,9 +414,9 @@ func TestQuickHeal(t *testing.T) { } xl = obj.(*xlObjects) // Corrupt one of the disks to return unformatted disk. - posixDisk, ok = xl.storageDisks[0].(*posix) + posixDisk, ok = xl.storageDisks[0].(*retryStorage) if !ok { - t.Fatal("storage disk is not *posix type") + t.Fatal("storage disk is not *retryStorage type") } xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errDiskNotFound) if err = quickHeal(xl.storageDisks, xl.writeQuorum, xl.readQuorum); err != nil { diff --git a/cmd/xl-v1-multipart-common_test.go b/cmd/xl-v1-multipart-common_test.go index b5eb77c7c..d3c94b594 100644 --- a/cmd/xl-v1-multipart-common_test.go +++ b/cmd/xl-v1-multipart-common_test.go @@ -65,7 +65,7 @@ func TestUpdateUploadJSON(t *testing.T) { // make some disks faulty to simulate a failure. for i := range xl.storageDisks[:9] { - xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*posix), nil, errFaultyDisk) + xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*retryStorage), nil, errFaultyDisk) } testErrVal := xl.updateUploadJSON(bucket, object, "222abc", time.Now().UTC(), false) diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index b56271be2..1349ec985 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -135,7 +135,7 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) { // for a 16 disk setup, quorum is 9. To simulate disks not found yet // quorum is available, we remove disks leaving quorum disks behind. for i := range xl.storageDisks[:7] { - xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*posix), nil, errFaultyDisk) + xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*retryStorage), nil, errFaultyDisk) } err = obj.DeleteObject(bucket, object) if err != nil { @@ -195,7 +195,7 @@ func TestGetObjectNoQuorum(t *testing.T) { } for i := range xl.storageDisks[:9] { switch diskType := xl.storageDisks[i].(type) { - case *posix: + case *retryStorage: xl.storageDisks[i] = newNaughtyDisk(diskType, diskErrors, errFaultyDisk) case *naughtyDisk: xl.storageDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk) @@ -246,7 +246,7 @@ func TestPutObjectNoQuorum(t *testing.T) { } for i := range xl.storageDisks[:9] { switch diskType := xl.storageDisks[i].(type) { - case *posix: + case *retryStorage: xl.storageDisks[i] = newNaughtyDisk(diskType, diskErrors, errFaultyDisk) case *naughtyDisk: xl.storageDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk) diff --git a/cmd/xl-v1_test.go b/cmd/xl-v1_test.go index 011a237f7..4626154b9 100644 --- a/cmd/xl-v1_test.go +++ b/cmd/xl-v1_test.go @@ -156,38 +156,22 @@ func TestNewXL(t *testing.T) { t.Fatal("Unexpected error: ", err) } - err = waitForFormatDisks(true, endpoints, nil) + _, err = waitForFormatDisks(true, endpoints, nil) if err != errInvalidArgument { t.Fatalf("Expecting error, got %s", err) } - err = waitForFormatDisks(true, nil, storageDisks) + _, err = waitForFormatDisks(true, nil, storageDisks) if err != errInvalidArgument { t.Fatalf("Expecting error, got %s", err) } // Initializes all erasure disks - err = waitForFormatDisks(true, endpoints, storageDisks) + formattedDisks, err := waitForFormatDisks(true, endpoints, storageDisks) if err != nil { t.Fatalf("Unable to format disks for erasure, %s", err) } - _, err = newXLObjects(storageDisks) - if err != nil { - t.Fatalf("Unable to initialize erasure, %s", err) - } - - endpoints, err = parseStorageEndpoints(erasureDisks) - if err != nil { - t.Fatalf("Unable to initialize erasure, %s", err) - } - - storageDisks, err = initStorageDisks(endpoints) - if err != nil { - t.Fatal("Unexpected error: ", err) - } - - // Initializes all erasure disks, ignoring first two. - _, err = newXLObjects(storageDisks) + _, err = newXLObjects(formattedDisks) if err != nil { t.Fatalf("Unable to initialize erasure, %s", err) }