From 36e12a603818b849f866e62cfcb1daf11684af10 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 9 Oct 2019 21:44:17 -0700 Subject: [PATCH] Assume local endpoints appropriately in k8s deployments (#8375) On Kubernetes/Docker setups DNS resolves inappropriately sometimes where there are situations same endpoints with multiple disks come online indicating either one of them is local and some of them are not local. This situation can never happen and its only a possibility in orchestrated deployments with dynamic DNS. Following code ensures that we treat if one of the endpoint says its local for a given host it is true for all endpoints for the same host. Following code ensures that this assumption is true and it works in all scenarios and it is safe to assume for a given host. This PR also adds validation such that we do not crash the server if there are bugs in the endpoints list in dsync initialization. Thanks to Daniel Valdivia for reproducing this, this fix is needed as part of the https://github.com/minio/m3 project. --- cmd/endpoint.go | 35 ++++++++++++++++++++++++++--------- cmd/endpoint_test.go | 40 +++++++++++++++++++++------------------- cmd/namespace-lock.go | 18 ++++++++++-------- cmd/server-main.go | 6 +++++- 4 files changed, 62 insertions(+), 37 deletions(-) diff --git a/cmd/endpoint.go b/cmd/endpoint.go index 4fdde0f35..09137c662 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -227,7 +227,7 @@ func (endpoints EndpointList) UpdateIsLocal() error { keepAliveTicker := time.NewTicker(retryInterval * time.Second) defer keepAliveTicker.Stop() for { - // Break if the local endpoint is found already. Or all the endpoints are resolved. + // Break if the local endpoint is found already Or all the endpoints are resolved. if foundLocal || (epsResolved == len(endpoints)) { break } @@ -240,13 +240,13 @@ func (endpoints EndpointList) UpdateIsLocal() error { default: for i, resolved := range resolvedList { if resolved { + // Continue if host is already resolved. continue } // return err if not Docker or Kubernetes - // We use IsDocker() method to check for Docker Swarm environment - // as there is no reliable way to clearly identify Swarm from - // Docker environment. + // We use IsDocker() to check for Docker environment + // We use IsKubernetes() to check for Kubernetes environment isLocal, err := isLocalHost(endpoints[i].HostName) if err != nil { if !IsDocker() && !IsKubernetes() { @@ -256,8 +256,7 @@ func (endpoints EndpointList) UpdateIsLocal() error { timeElapsed := time.Since(startTime) // log error only if more than 1s elapsed if timeElapsed > time.Second { - // log the message to console about the host not being - // resolveable. + // Log the message to console about the host not being resolveable. reqInfo := (&logger.ReqInfo{}).AppendTags("host", endpoints[i].HostName) reqInfo.AppendTags("elapsedTime", humanize.RelTime(startTime, startTime.Add(timeElapsed), "elapsed", "")) ctx := logger.SetReqInfo(context.Background(), reqInfo) @@ -274,15 +273,33 @@ func (endpoints EndpointList) UpdateIsLocal() error { } // Wait for the tick, if the there exist a local endpoint in discovery. - // Non docker/kubernetes environment does not need to wait. - if !foundLocal && (IsDocker() && IsKubernetes()) { + // Non docker/kubernetes environment we do not need to wait. + if !foundLocal && (IsDocker() || IsKubernetes()) { <-keepAliveTicker.C } } } + // On Kubernetes/Docker setups DNS resolves inappropriately sometimes + // where there are situations same endpoints with multiple disks + // come online indicating either one of them is local and some + // of them are not local. This situation can never happen and + // its only a possibility in orchestrated deployments with dynamic + // DNS. Following code ensures that we treat if one of the endpoint + // says its local for a given host - it is true for all endpoints + // for the same host. Following code ensures that this assumption + // is true and it works in all scenarios and it is safe to assume + // for a given host. + endpointLocalMap := make(map[string]bool) + for _, ep := range endpoints { + if ep.IsLocal { + endpointLocalMap[ep.Host] = ep.IsLocal + } + } + for i := range endpoints { + endpoints[i].IsLocal = endpointLocalMap[endpoints[i].Host] + } return nil - } // NewEndpointList - returns new endpoint list based on input args. diff --git a/cmd/endpoint_test.go b/cmd/endpoint_test.go index 525929987..4098bfca6 100644 --- a/cmd/endpoint_test.go +++ b/cmd/endpoint_test.go @@ -350,27 +350,29 @@ func TestCreateEndpoints(t *testing.T) { } for i, testCase := range testCases { - serverAddr, endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...) - - if err == nil { - if testCase.expectedErr != nil { - t.Fatalf("Test (%d) error: expected = %v, got = ", i+1, testCase.expectedErr) - } else { - if serverAddr != testCase.expectedServerAddr { - t.Fatalf("Test (%d) serverAddr: expected = %v, got = %v", i+1, testCase.expectedServerAddr, serverAddr) - } - if !reflect.DeepEqual(endpoints, testCase.expectedEndpoints) { - t.Fatalf("Test (%d) endpoints: expected = %v, got = %v", i+1, testCase.expectedEndpoints, endpoints) - } - if setupType != testCase.expectedSetupType { - t.Fatalf("Test (%d) setupType: expected = %v, got = %v", i+1, testCase.expectedSetupType, setupType) + testCase := testCase + t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { + serverAddr, endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...) + if err == nil { + if testCase.expectedErr != nil { + t.Fatalf("error: expected = %v, got = ", testCase.expectedErr) + } else { + if serverAddr != testCase.expectedServerAddr { + t.Fatalf("serverAddr: expected = %v, got = %v", testCase.expectedServerAddr, serverAddr) + } + if !reflect.DeepEqual(endpoints, testCase.expectedEndpoints) { + t.Fatalf("endpoints: expected = %v, got = %v", testCase.expectedEndpoints, endpoints) + } + if setupType != testCase.expectedSetupType { + t.Fatalf("setupType: expected = %v, got = %v", testCase.expectedSetupType, setupType) + } } + } else if testCase.expectedErr == nil { + t.Fatalf("error: expected = , got = %v", err) + } else if err.Error() != testCase.expectedErr.Error() { + t.Fatalf("error: expected = %v, got = %v", testCase.expectedErr, err) } - } else if testCase.expectedErr == nil { - t.Fatalf("Test (%d) error: expected = , got = %v", i+1, err) - } else if err.Error() != testCase.expectedErr.Error() { - t.Fatalf("Test (%d) error: expected = %v, got = %v", i+1, testCase.expectedErr, err) - } + }) } } diff --git a/cmd/namespace-lock.go b/cmd/namespace-lock.go index 0170cb383..a87b5ffd0 100644 --- a/cmd/namespace-lock.go +++ b/cmd/namespace-lock.go @@ -53,8 +53,9 @@ type RWLocker interface { // Initialize distributed locking only in case of distributed setup. // Returns lock clients and the node index for the current server. -func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) { +func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int, err error) { myNode = -1 + seenHosts := set.NewStringSet() for _, endpoint := range endpoints { if seenHosts.Contains(endpoint.Host) { @@ -66,26 +67,27 @@ func newDsyncNodes(endpoints EndpointList) (clnts []dsync.NetLocker, myNode int) if endpoint.IsLocal { myNode = len(clnts) - receiver := &lockRESTServer{ + globalLockServer = &lockRESTServer{ ll: &localLocker{ serverAddr: endpoint.Host, serviceEndpoint: lockServicePath, lockMap: make(map[string][]lockRequesterInfo), }, } - - globalLockServer = receiver - locker = receiver.ll + locker = globalLockServer.ll } else { - host, err := xnet.ParseHost(endpoint.Host) - logger.FatalIf(err, "Unable to parse Lock RPC Host") + var host *xnet.Host + host, err = xnet.ParseHost(endpoint.Host) locker = newlockRESTClient(host) } clnts = append(clnts, locker) } - return clnts, myNode + if myNode == -1 { + return clnts, myNode, errors.New("no endpoint pointing to the local machine is found") + } + return clnts, myNode, err } // newNSLock - return a new name space lock map. diff --git a/cmd/server-main.go b/cmd/server-main.go index cf2e05cbf..9c0a9f7ec 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -276,7 +276,11 @@ func serverMain(ctx *cli.Context) { // Set nodes for dsync for distributed setup. if globalIsDistXL { - globalDsync, err = dsync.New(newDsyncNodes(globalEndpoints)) + clnts, myNode, err := newDsyncNodes(globalEndpoints) + if err != nil { + logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints) + } + globalDsync, err = dsync.New(clnts, myNode) if err != nil { logger.Fatal(err, "Unable to initialize distributed locking on %s", globalEndpoints) }