From f1bc9343a16d3ff91812e27dde03cb33c2e4e55c Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 7 Oct 2016 11:15:55 -0700 Subject: [PATCH] prep: Initialization should wait instead of exit the servers. (#2872) - Servers do not exit for invalid credentials instead they print and wait. - Servers do not exit for version mismatch instead they print and wait. - Servers do not exit for time differences between nodes they print and wait. --- cmd/auth-rpc-client.go | 4 ++++ cmd/controller-handlers.go | 4 ++++ cmd/controller-router.go | 3 +++ cmd/generic-handlers.go | 4 ++-- cmd/globals.go | 2 +- cmd/prepare-storage-msg.go | 25 +++++++++++++++++++++++++ cmd/prepare-storage-msg_test.go | 29 +++++++++++++++++++++++++++++ cmd/prepare-storage.go | 33 ++++++++++++++++++++++++++++++++- cmd/prepare-storage_test.go | 27 +++++++++++++++++++++++++++ cmd/retry.go | 3 --- cmd/server-main.go | 16 ---------------- cmd/signature-jwt.go | 9 ++++++--- cmd/signature-jwt_test.go | 6 ++---- cmd/storage-rpc-client.go | 8 ++++++++ cmd/storage-rpc-client_test.go | 16 ++++++++++++++++ cmd/storage-rpc-server.go | 4 ++-- 16 files changed, 161 insertions(+), 32 deletions(-) diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index cb956d701..43e13966e 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -135,6 +135,10 @@ func (authClient *AuthRPCClient) Login() error { if reply.ServerVersion != Version { return errServerVersionMismatch } + curTime := time.Now().UTC() + if curTime.Sub(reply.Timestamp) > globalMaxSkewTime { + return errServerTimeMismatch + } // Set token, time stamp as received from a successful login call. authClient.token = reply.Token authClient.tstamp = reply.Timestamp diff --git a/cmd/controller-handlers.go b/cmd/controller-handlers.go index b5279d177..0741046f1 100644 --- a/cmd/controller-handlers.go +++ b/cmd/controller-handlers.go @@ -24,6 +24,9 @@ var errServerNotInitialized = errors.New("Server not initialized, please try aga // errServerVersionMismatch - server versions do not match. var errServerVersionMismatch = errors.New("Server versions do not match.") +// errServerTimeMismatch - server times are too far apart. +var errServerTimeMismatch = errors.New("Server times are too far apart.") + /// Auth operations // Login - login handler. @@ -40,6 +43,7 @@ func (c *controllerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLogin return err } reply.Token = token + reply.Timestamp = c.timestamp reply.ServerVersion = Version return nil } diff --git a/cmd/controller-router.go b/cmd/controller-router.go index 53a8a9634..b7025cfbd 100644 --- a/cmd/controller-router.go +++ b/cmd/controller-router.go @@ -18,6 +18,7 @@ package cmd import ( "net/rpc" + "time" router "github.com/gorilla/mux" ) @@ -33,6 +34,7 @@ func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfi ctrlHandlers := &controllerAPIHandlers{ ObjectAPI: newObjectLayerFn, StorageDisks: srvCmdConfig.storageDisks, + timestamp: time.Now().UTC(), } ctrlRPCServer := rpc.NewServer() @@ -46,4 +48,5 @@ func registerControllerRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfi type controllerAPIHandlers struct { ObjectAPI func() ObjectLayer StorageDisks []StorageAPI + timestamp time.Time } diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index 10f993923..a7b8d1901 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -202,10 +202,10 @@ func (h timeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { writeErrorResponse(w, r, apiErr, r.URL.Path) return } - // Verify if the request date header is shifted by less than maxSkewTime parameter in the past + // Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past // or in the future, reject request otherwise. curTime := time.Now().UTC() - if curTime.Sub(amzDate) > maxSkewTime || amzDate.Sub(curTime) > maxSkewTime { + if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime { writeErrorResponse(w, r, ErrRequestTimeTooSkewed, r.URL.Path) return } diff --git a/cmd/globals.go b/cmd/globals.go index 02e743f13..0ac2a299e 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -65,7 +65,7 @@ var ( var ( // The maximum allowed difference between the request generation time and the server processing time - maxSkewTime = 15 * time.Minute + globalMaxSkewTime = 15 * time.Minute ) // global colors. diff --git a/cmd/prepare-storage-msg.go b/cmd/prepare-storage-msg.go index 9eec43b6a..5b4204732 100644 --- a/cmd/prepare-storage-msg.go +++ b/cmd/prepare-storage-msg.go @@ -134,3 +134,28 @@ func getFormatMsg(storageDisks []StorageAPI) string { } return msg } + +func printConfigErrMsg(storageDisks []StorageAPI, sErrs []error, fn printOnceFunc) { + msg := getConfigErrMsg(storageDisks, sErrs) + fn(msg) +} + +// Generate a formatted message when cluster is misconfigured. +func getConfigErrMsg(storageDisks []StorageAPI, sErrs []error) string { + msg := colorBlue("\nDetected configuration inconsistencies in the cluster. Please fix following servers.") + for i, disk := range storageDisks { + if disk == nil { + continue + } + if sErrs[i] == nil { + continue + } + msg += fmt.Sprintf( + "\n[%s] %s : %s", + int2Str(i+1, len(storageDisks)), + storageDisks[i], + sErrs[i], + ) + } + return msg +} diff --git a/cmd/prepare-storage-msg_test.go b/cmd/prepare-storage-msg_test.go index 791e0b0ba..c0f1112cc 100644 --- a/cmd/prepare-storage-msg_test.go +++ b/cmd/prepare-storage-msg_test.go @@ -20,15 +20,40 @@ import "testing" // Tests heal message to be correct and properly formatted. func TestHealMsg(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatal("Unable to initialize test config", err) + } + defer removeAll(rootPath) storageDisks, fsDirs := prepareXLStorageDisks(t) + errs := make([]error, len(storageDisks)) defer removeRoots(fsDirs) + nilDisks := deepCopyStorageDisks(storageDisks) + nilDisks[5] = nil + authErrs := make([]error, len(storageDisks)) + authErrs[5] = errAuthentication testCases := []struct { endPoint string storageDisks []StorageAPI + serrs []error }{ + // Test - 1 for valid disks and errors. { endPoint: "http://10.1.10.1:9000", storageDisks: storageDisks, + serrs: errs, + }, + // Test - 2 for one of the disks is nil. + { + endPoint: "http://10.1.10.1:9000", + storageDisks: nilDisks, + serrs: errs, + }, + // Test - 3 for one of the errs is authentication. + { + endPoint: "http://10.1.10.1:9000", + storageDisks: nilDisks, + serrs: authErrs, }, } for i, testCase := range testCases { @@ -44,6 +69,10 @@ func TestHealMsg(t *testing.T) { if msg == "" { t.Fatalf("Test: %d Unable to get format message.", i+1) } + msg = getConfigErrMsg(testCase.storageDisks, testCase.serrs) + if msg == "" { + t.Fatalf("Test: %d Unable to get config error message.", i+1) + } } } diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index 792f09876..1dcb3e8bd 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -93,6 +93,9 @@ const ( // WaitForFormatting - Wait for formatting to be triggered from the '1st' server in the cluster. WaitForFormatting + // WaitForConfig - Wait for all servers to have the same config including (credentials, version and time). + WaitForConfig + // InitObjectLayer - Initialize object layer. InitObjectLayer @@ -101,6 +104,26 @@ const ( Abort ) +// Quick error to actions converts looking for specific errors which need to +// be returned quickly and server should wait instead. +func quickErrToActions(errMap map[error]int) InitActions { + var action InitActions + switch { + case errMap[errInvalidAccessKeyID] > 0: + fallthrough + case errMap[errAuthentication] > 0: + fallthrough + case errMap[errServerVersionMismatch] > 0: + fallthrough + case errMap[errServerTimeMismatch] > 0: + action = WaitForConfig + } + return action +} + +// Preparatory initialization stage for XL validates known errors. +// Converts them into specific actions. These actions have special purpose +// which caller decides on what needs to be done. func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions { // Count errors by error value. errMap := make(map[error]int) @@ -108,6 +131,11 @@ func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions { errMap[err]++ } + // Validates and converts specific config errors into WaitForConfig. + if quickErrToActions(errMap) == WaitForConfig { + return WaitForConfig + } + quorum := diskCount/2 + 1 disksOffline := errMap[errDiskNotFound] disksFormatted := errMap[nil] @@ -151,7 +179,7 @@ func prepForInitXL(firstDisk bool, sErrs []error, diskCount int) InitActions { } // Some of the formatted disks are possibly corrupted or unformatted, heal them. return WaitForHeal - } // No quorum wait for quorum number of disks. + } // Exhausted all our checks, un-handled errors perhaps we Abort. return WaitForQuorum } @@ -201,6 +229,9 @@ func retryFormattingDisks(firstDisk bool, firstEndpoint string, storageDisks []S "Initializing data volume. Waiting for minimum %d servers to come online.\n", len(storageDisks)/2+1, ) + case WaitForConfig: + // Print configuration errors. + printConfigErrMsg(storageDisks, sErrs, printOnceFn()) case WaitForAll: console.Println("Initializing data volume for first time. Waiting for other servers to come online.") case WaitForFormatting: diff --git a/cmd/prepare-storage_test.go b/cmd/prepare-storage_test.go index 9a21a85b0..e4646149d 100644 --- a/cmd/prepare-storage_test.go +++ b/cmd/prepare-storage_test.go @@ -32,6 +32,8 @@ func (action InitActions) String() string { return "WaitForAll" case WaitForQuorum: return "WaitForQuorum" + case WaitForConfig: + return "WaitForConfig" case Abort: return "Abort" default: @@ -79,6 +81,26 @@ func TestPrepForInitXL(t *testing.T) { errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound, errDiskNotFound, nil, nil, nil, } + // Invalid access key id. + accessKeyIDErr := []error{ + errInvalidAccessKeyID, nil, nil, nil, + nil, nil, nil, nil, + } + // Authentication error. + authenticationErr := []error{ + nil, nil, nil, nil, + errAuthentication, nil, nil, nil, + } + // Server version mismatch. + serverVersionMismatch := []error{ + errServerVersionMismatch, nil, nil, nil, + errServerVersionMismatch, nil, nil, nil, + } + // Server time mismatch. + serverTimeMismatch := []error{ + nil, nil, nil, nil, + errServerTimeMismatch, nil, nil, nil, + } testCases := []struct { // Params for prepForInit(). @@ -105,6 +127,11 @@ func TestPrepForInitXL(t *testing.T) { {false, noQuourm, 8, WaitForQuorum}, {false, minorityCorrupted, 8, WaitForHeal}, {false, majorityCorrupted, 8, Abort}, + // Config mistakes. + {true, accessKeyIDErr, 8, WaitForConfig}, + {true, authenticationErr, 8, WaitForConfig}, + {true, serverVersionMismatch, 8, WaitForConfig}, + {true, serverTimeMismatch, 8, WaitForConfig}, } for i, test := range testCases { actual := prepForInitXL(test.firstDisk, test.errs, test.diskCount) diff --git a/cmd/retry.go b/cmd/retry.go index 9ffbf41fa..99e9ab421 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -45,9 +45,6 @@ func (r *lockedRandSource) Seed(seed int64) { r.lk.Unlock() } -// MaxRetry is the maximum number of retries before stopping. -var MaxRetry = 5 - // MaxJitter will randomize over the full exponential backoff time const MaxJitter = 1.0 diff --git a/cmd/server-main.go b/cmd/server-main.go index 2e18ef314..d203ed061 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -246,18 +246,6 @@ func checkNamingDisks(disks []string) error { return nil } -// Validates remote disks are successfully accessible, ignores networks errors. -func validateRemoteDisks(disks []StorageAPI) error { - for _, disk := range disks { - _, err := disk.DiskInfo() - if _, ok := err.(*net.OpError); ok { - continue - } - return err - } - return nil -} - // Validate input disks. func validateDisks(disks []string, ignoredDisks []string) []StorageAPI { isXL := len(disks) > 1 @@ -278,10 +266,6 @@ func validateDisks(disks []string, ignoredDisks []string) []StorageAPI { } storageDisks, err := initStorageDisks(disks, ignoredDisks) fatalIf(err, "Unable to initialize storage disks.") - if isXL { - err = validateRemoteDisks(storageDisks) - fatalIf(err, "Unable to validate remote disks.") - } return storageDisks } diff --git a/cmd/signature-jwt.go b/cmd/signature-jwt.go index 28da1d701..df148bea4 100644 --- a/cmd/signature-jwt.go +++ b/cmd/signature-jwt.go @@ -78,6 +78,10 @@ func (jwt *JWT) GenerateToken(accessKey string) (string, error) { return token.SignedString([]byte(jwt.SecretAccessKey)) } +var errInvalidAccessKeyID = errors.New("The access key ID you provided does not exist in our records.") + +var errAuthentication = errors.New("Authentication failed, check your access credentials.") + // Authenticate - authenticates incoming access key and secret key. func (jwt *JWT) Authenticate(accessKey, secretKey string) error { // Trim spaces. @@ -91,13 +95,12 @@ func (jwt *JWT) Authenticate(accessKey, secretKey string) error { } if accessKey != jwt.AccessKeyID { - return errors.New("Access key does not match") + return errInvalidAccessKeyID } hashedSecretKey, _ := bcrypt.GenerateFromPassword([]byte(jwt.SecretAccessKey), bcrypt.DefaultCost) - if bcrypt.CompareHashAndPassword(hashedSecretKey, []byte(secretKey)) != nil { - return errors.New("Authentication failed") + return errAuthentication } // Success. diff --git a/cmd/signature-jwt_test.go b/cmd/signature-jwt_test.go index 0300ef389..f1ec61d30 100644 --- a/cmd/signature-jwt_test.go +++ b/cmd/signature-jwt_test.go @@ -201,9 +201,9 @@ func TestAuthenticate(t *testing.T) { // Secret key too long. {"myuser", "pass1234567890123456789012345678901234567", fmt.Errorf("Invalid secret key")}, // Authentication error. - {"myuser", "mypassword", fmt.Errorf("Access key does not match")}, + {"myuser", "mypassword", errInvalidAccessKeyID}, // Authentication error. - {serverConfig.GetCredential().AccessKeyID, "mypassword", fmt.Errorf("Authentication failed")}, + {serverConfig.GetCredential().AccessKeyID, "mypassword", errAuthentication}, // Success. {serverConfig.GetCredential().AccessKeyID, serverConfig.GetCredential().SecretAccessKey, nil}, // Success when access key contains leading/trailing spaces. @@ -213,12 +213,10 @@ func TestAuthenticate(t *testing.T) { // Run tests. for _, testCase := range testCases { err := jwt.Authenticate(testCase.accessKey, testCase.secretKey) - if testCase.expectedErr != nil { if err == nil { t.Fatalf("%+v: expected: %s, got: ", testCase, testCase.expectedErr) } - if testCase.expectedErr.Error() != err.Error() { t.Fatalf("%+v: expected: %s, got: %s", testCase, testCase.expectedErr, err) } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index 889f19269..bfaccaad9 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -81,6 +81,14 @@ func toStorageErr(err error) error { return errCorruptedFormat case errUnformattedDisk.Error(): return errUnformattedDisk + case errInvalidAccessKeyID.Error(): + return errInvalidAccessKeyID + case errAuthentication.Error(): + return errAuthentication + case errServerVersionMismatch.Error(): + return errServerVersionMismatch + case errServerTimeMismatch.Error(): + return errServerTimeMismatch } return err } diff --git a/cmd/storage-rpc-client_test.go b/cmd/storage-rpc-client_test.go index ec593c789..6126968f5 100644 --- a/cmd/storage-rpc-client_test.go +++ b/cmd/storage-rpc-client_test.go @@ -104,6 +104,22 @@ func TestStorageErr(t *testing.T) { expectedErr: errFileNameTooLong, err: fmt.Errorf("%s", errFileNameTooLong.Error()), }, + { + expectedErr: errInvalidAccessKeyID, + err: fmt.Errorf("%s", errInvalidAccessKeyID.Error()), + }, + { + expectedErr: errAuthentication, + err: fmt.Errorf("%s", errAuthentication.Error()), + }, + { + expectedErr: errServerVersionMismatch, + err: fmt.Errorf("%s", errServerVersionMismatch.Error()), + }, + { + expectedErr: errServerTimeMismatch, + err: fmt.Errorf("%s", errServerTimeMismatch.Error()), + }, { expectedErr: unknownErr, err: unknownErr, diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index f18494a7d..d92780ac4 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -229,7 +229,7 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e if len(ignoredExports) > 0 { ignoredSet = set.CreateStringSet(ignoredExports...) } - t := time.Now().UTC() + tstamp := time.Now().UTC() for _, export := range exports { if ignoredSet.Contains(export) { // Ignore initializing ignored export. @@ -251,7 +251,7 @@ func newRPCServer(serverConfig serverCmdConfig) (servers []*storageServer, err e servers = append(servers, &storageServer{ storage: storage, path: export, - timestamp: t, + timestamp: tstamp, }) } }