diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 4fdde0f35..09137c662 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -227,7 +227,7 @@ func (endpoints EndpointList) UpdateIsLocal() error { keepAliveTicker := time.NewTicker(retryInterval * time.Second) defer keepAliveTicker.Stop() for { - // Break if the local endpoint is found already. Or all the endpoints are resolved. + // Break if the local endpoint is found already Or all the endpoints are resolved. if foundLocal || (epsResolved == len(endpoints)) { break } @@ -240,13 +240,13 @@ func (endpoints EndpointList) UpdateIsLocal() error { default: for i, resolved := range resolvedList { if resolved { + // Continue if host is already resolved. continue } // return err if not Docker or Kubernetes - // We use IsDocker() method to check for Docker Swarm environment - // as there is no reliable way to clearly identify Swarm from - // Docker environment. + // We use IsDocker() to check for Docker environment + // We use IsKubernetes() to check for Kubernetes environment isLocal, err := isLocalHost(endpoints[i].HostName) if err != nil { if !IsDocker() && !IsKubernetes() { @@ -256,8 +256,7 @@ func (endpoints EndpointList) UpdateIsLocal() error { timeElapsed := time.Since(startTime) // log error only if more than 1s elapsed if timeElapsed > time.Second { - // log the message to console about the host not being - // resolveable. + // Log the message to console about the host not being resolveable. reqInfo := (&logger.ReqInfo{}).AppendTags("host", endpoints[i].HostName) reqInfo.AppendTags("elapsedTime", humanize.RelTime(startTime, startTime.Add(timeElapsed), "elapsed", "")) ctx := logger.SetReqInfo(context.Background(), reqInfo) @@ -274,15 +273,33 @@ func (endpoints EndpointList) UpdateIsLocal() error { } // Wait for the tick, if the there exist a local endpoint in discovery. - // Non docker/kubernetes environment does not need to wait. - if !foundLocal && (IsDocker() && IsKubernetes()) { + // Non docker/kubernetes environment we do not need to wait. + if !foundLocal && (IsDocker() || IsKubernetes()) { <-keepAliveTicker.C } } } + // On Kubernetes/Docker setups DNS resolves inappropriately sometimes + // where there are situations same endpoints with multiple disks + // come online indicating either one of them is local and some + // of them are not local. This situation can never happen and + // its only a possibility in orchestrated deployments with dynamic + // DNS. Following code ensures that we treat if one of the endpoint + // says its local for a given host - it is true for all endpoints + // for the same host. Following code ensures that this assumption + // is true and it works in all scenarios and it is safe to assume + // for a given host. + endpointLocalMap := make(map[string]bool) + for _, ep := range endpoints { + if ep.IsLocal { + endpointLocalMap[ep.Host] = ep.IsLocal + } + } + for i := range endpoints { + endpoints[i].IsLocal = endpointLocalMap[endpoints[i].Host] + } return nil - } // NewEndpointList - returns new endpoint list based on input args. diff --git a/cmd/endpoint_test.go b/cmd/endpoint_test.go index 525929987..4098bfca6 100644 --- a/cmd/endpoint_test.go +++ b/cmd/endpoint_test.go @@ -350,27 +350,29 @@ func TestCreateEndpoints(t *testing.T) { } for i, testCase := range testCases { - serverAddr, endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...) - - if err == nil { - if testCase.expectedErr != nil { - t.Fatalf("Test (%d) error: expected = %v, got = ", i+1, testCase.expectedErr) - } else { - if serverAddr != testCase.expectedServerAddr { - t.Fatalf("Test (%d) serverAddr: expected = %v, got = %v", i+1, testCase.expectedServerAddr, serverAddr) - } - if !reflect.DeepEqual(endpoints, testCase.expectedEndpoints) { - t.Fatalf("Test (%d) endpoints: expected = %v, got = %v", i+1, testCase.expectedEndpoints, endpoints) - } - if setupType != testCase.expectedSetupType { - t.Fatalf("Test (%d) setupType: expected = %v, got = %v", i+1, testCase.expectedSetupType, setupType) + testCase := testCase + t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { + serverAddr, endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...) + if err == nil { + if testCase.expectedErr != nil { + t.Fatalf("error: expected = %v, got = ", testCase.expectedErr) + } else { + if serverAddr != testCase.expectedServerAddr { + t.Fatalf("serverAddr: expected = %v, got = %v", testCase.expectedServerAddr, serverAddr) + } + if !reflect.DeepEqual(endpoints, testCase.expectedEndpoints) { + t.Fatalf("endpoints: expected = %v, got = %v", testCase.expectedEndpoints, endpoints) + } + if setupType != testCase.expectedSetupType { + t.Fatalf("setupType: expected = %v, got = %v", testCase.expectedSetupType, setupType) + } } + } else if testCase.expectedErr == nil { + t.Fatalf("error: expected = , got = %v", err) + } else if err.Error() != testCase.expectedErr.Error() { + t.Fatalf("error: expected = %v, got = %v", testCase.expectedErr, err) } - } else if testCase.expectedErr == nil { - t.Fatalf("Test (%d) error: expected = , got = %v", i+1, err) - } else if err.Error() != testCase.expectedErr.Error() { - t.Fatalf("Test (%d) error: expected = %v, got = %v", i+1, testCase.expectedErr, err) - } + }) } } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 0170cb383..a87b5ffd0 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -53,8 +53,9 @@ type RWLocker interface { // Initialize distributed locking only in case of distributed setup. // Returns lock clients and the node index for the current server. -func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) { +func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int, err error) { myNode = -1 + seenHosts := set.NewStringSet() for _, endpoint := range endpoints { if seenHosts.Contains(endpoint.Host) { @@ -66,26 +67,27 @@ func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) if endpoint.IsLocal { myNode = len(clnts) - receiver := &lockRESTServer{ + globalLockServer = &lockRESTServer{ ll: &localLocker{ serverAddr: endpoint.Host, serviceEndpoint: lockServicePath, lockMap: make(map[string][]lockRequesterInfo), }, } - - globalLockServer = receiver - locker = receiver.ll + locker = globalLockServer.ll } else { - host, err := xnet.ParseHost(endpoint.Host) - logger.FatalIf(err, "Unable to parse Lock RPC Host") + var host *xnet.Host + host, err = xnet.ParseHost(endpoint.Host) locker = newlockRESTClient(host) } clnts = append(clnts, locker) } - return clnts, myNode + if myNode == -1 { + return clnts, myNode, errors.New("no endpoint pointing to the local machine is found") + } + return clnts, myNode, err } // newNSLock - return a new name space lock map. diff --git a/cmd/server-main.go b/cmd/server-main.go index cf2e05cbf..9c0a9f7ec 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -276,7 +276,11 @@ func serverMain(ctx *cli.Context) { // Set nodes for dsync for distributed setup. if globalIsDistXL { - globalDsync, err = dsync.New(newDsyncNodes(globalEndpoints)) + clnts, myNode, err := newDsyncNodes(globalEndpoints) + if err != nil { + logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints) + } + globalDsync, err = dsync.New(clnts, myNode) if err != nil { logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints) }