fix: replica set deployment for multi tenants (#8673)

Changes in IP underneath are dynamic in replica sets
with multiple tenants, so deploying in that fashion
will not work until we wait for atleast one participatory
server to be local.

This PR also ensures that multi-tenant zone expansion also
works in replica set k8s deployments.

Introduces a new ENV `KUBERNETES_REPLICA_SET` check to call
appropriate code paths.
master
Harshavardhana 5 years ago committed by kannappanr
parent 39face27cf
commit d140074773
  1. 2
      cmd/common-main.go
  2. 11
      cmd/endpoint-ellipses.go
  3. 37
      cmd/endpoint.go
  4. 6
      cmd/endpoint_test.go
  5. 8
      cmd/net.go
  6. 4
      cmd/notification.go
  7. 5
      cmd/update.go

@ -195,7 +195,7 @@ func handleCommonEnvVars() {
} else { } else {
// Add found interfaces IP address to global domain IPS, // Add found interfaces IP address to global domain IPS,
// loopback addresses will be naturally dropped. // loopback addresses will be naturally dropped.
updateDomainIPs(localIP4) updateDomainIPs(mustGetLocalIP4())
} }
// In place update is true by default if the MINIO_UPDATE is not set // In place update is true by default if the MINIO_UPDATE is not set

@ -270,7 +270,7 @@ func createServerEndpoints(serverAddr string, args ...string) (EndpointZones, in
if err != nil { if err != nil {
return nil, -1, -1, err return nil, -1, -1, err
} }
endpointList, newSetupType, err := CreateEndpoints(serverAddr, setArgs...) endpointList, newSetupType, err := CreateEndpoints(serverAddr, false, setArgs...)
if err != nil { if err != nil {
return nil, -1, -1, err return nil, -1, -1, err
} }
@ -283,6 +283,8 @@ func createServerEndpoints(serverAddr string, args ...string) (EndpointZones, in
return endpointZones, len(setArgs[0]), setupType, nil return endpointZones, len(setArgs[0]), setupType, nil
} }
var foundPrevLocal bool
// Verify the args setup-type appropriately. // Verify the args setup-type appropriately.
{ {
setArgs, err := GetAllSets(args...) setArgs, err := GetAllSets(args...)
@ -290,10 +292,13 @@ func createServerEndpoints(serverAddr string, args ...string) (EndpointZones, in
return nil, -1, -1, err return nil, -1, -1, err
} }
_, setupType, err = CreateEndpoints(serverAddr, setArgs...) var endpoints Endpoints
endpoints, setupType, err = CreateEndpoints(serverAddr, foundPrevLocal, setArgs...)
if err != nil { if err != nil {
return nil, -1, -1, err return nil, -1, -1, err
} }
foundPrevLocal = endpoints.atleastOneEndpointLocal()
} }
for _, arg := range args { for _, arg := range args {
@ -301,7 +306,7 @@ func createServerEndpoints(serverAddr string, args ...string) (EndpointZones, in
if err != nil { if err != nil {
return nil, -1, -1, err return nil, -1, -1, err
} }
endpointList, _, err := CreateEndpoints(serverAddr, setArgs...) endpointList, _, err := CreateEndpoints(serverAddr, foundPrevLocal, setArgs...)
if err != nil { if err != nil {
return nil, -1, -1, err return nil, -1, -1, err
} }

@ -249,9 +249,19 @@ func (endpoints Endpoints) doAnyHostsResolveToLocalhost() bool {
return false return false
} }
func (endpoints Endpoints) atleastOneEndpointLocal() bool {
for _, endpoint := range endpoints {
if endpoint.IsLocal {
return true
}
}
return false
}
// UpdateIsLocal - resolves the host and discovers the local host. // UpdateIsLocal - resolves the host and discovers the local host.
func (endpoints Endpoints) UpdateIsLocal() error { func (endpoints Endpoints) UpdateIsLocal(foundPrevLocal bool) error {
orchestrated := IsDocker() || IsKubernetes() orchestrated := IsDocker() || IsKubernetes()
k8sReplicaSet := IsKubernetesReplicaSet()
var epsResolved int var epsResolved int
var foundLocal bool var foundLocal bool
@ -284,8 +294,8 @@ func (endpoints Endpoints) UpdateIsLocal() error {
endpoints[i].Hostname(), endpoints[i].Hostname(),
) )
if orchestrated && endpoints.doAnyHostsResolveToLocalhost() { if k8sReplicaSet && endpoints.doAnyHostsResolveToLocalhost() {
err := errors.New("hosts resolve 127.*, DNS not updated on k8s") err := errors.New("host found resolves to 127.*, DNS incorrectly configured retrying")
// time elapsed // time elapsed
timeElapsed := time.Since(startTime) timeElapsed := time.Since(startTime)
// log error only if more than 1s elapsed // log error only if more than 1s elapsed
@ -329,6 +339,21 @@ func (endpoints Endpoints) UpdateIsLocal() error {
} else { } else {
resolvedList[i] = true resolvedList[i] = true
endpoints[i].IsLocal = isLocal endpoints[i].IsLocal = isLocal
if k8sReplicaSet && !endpoints.atleastOneEndpointLocal() && !foundPrevLocal {
// In replicated set in k8s deployment, IPs might
// get resolved for older IPs, add this code
// to ensure that we wait for this server to
// participate atleast one disk and be local.
//
// In special cases for replica set with expanded
// zone setups we need to make sure to provide
// value of foundPrevLocal from zone1 if we already
// found a local setup. Only if we haven't found
// previous local we continue to wait to look for
// atleast one local.
resolvedList[i] = false
continue
}
epsResolved++ epsResolved++
if !foundLocal { if !foundLocal {
foundLocal = isLocal foundLocal = isLocal
@ -439,7 +464,7 @@ func checkCrossDeviceMounts(endpoints Endpoints) (err error) {
} }
// CreateEndpoints - validates and creates new endpoints for given args. // CreateEndpoints - validates and creates new endpoints for given args.
func CreateEndpoints(serverAddr string, args ...[]string) (Endpoints, SetupType, error) { func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endpoints, SetupType, error) {
var endpoints Endpoints var endpoints Endpoints
var setupType SetupType var setupType SetupType
var err error var err error
@ -505,7 +530,7 @@ func CreateEndpoints(serverAddr string, args ...[]string) (Endpoints, SetupType,
return endpoints, setupType, nil return endpoints, setupType, nil
} }
if err = endpoints.UpdateIsLocal(); err != nil { if err = endpoints.UpdateIsLocal(foundLocal); err != nil {
return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error()) return endpoints, setupType, config.ErrInvalidErasureEndpoints(nil).Msg(err.Error())
} }
@ -695,6 +720,6 @@ func updateDomainIPs(endPoints set.StringSet) {
if err != nil { if err != nil {
host = ip host = ip
} }
return !net.ParseIP(host).IsLoopback() return !net.ParseIP(host).IsLoopback() && host != "localhost"
}, "") }, "")
} }

@ -341,7 +341,7 @@ func TestCreateEndpoints(t *testing.T) {
for _, testCase := range testCases { for _, testCase := range testCases {
testCase := testCase testCase := testCase
t.Run("", func(t *testing.T) { t.Run("", func(t *testing.T) {
endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, testCase.args...) endpoints, setupType, err := CreateEndpoints(testCase.serverAddr, false, testCase.args...)
if err == nil && testCase.expectedErr != nil { if err == nil && testCase.expectedErr != nil {
t.Errorf("error: expected = %v, got = <nil>", testCase.expectedErr) t.Errorf("error: expected = %v, got = <nil>", testCase.expectedErr)
} }
@ -398,7 +398,7 @@ func TestGetLocalPeer(t *testing.T) {
for i, testCase := range testCases { for i, testCase := range testCases {
zendpoints := mustGetZoneEndpoints(testCase.endpointArgs...) zendpoints := mustGetZoneEndpoints(testCase.endpointArgs...)
if !zendpoints[0].Endpoints[0].IsLocal { if !zendpoints[0].Endpoints[0].IsLocal {
if err := zendpoints[0].Endpoints.UpdateIsLocal(); err != nil { if err := zendpoints[0].Endpoints.UpdateIsLocal(false); err != nil {
t.Fatalf("error: expected = <nil>, got = %v", err) t.Fatalf("error: expected = <nil>, got = %v", err)
} }
} }
@ -430,7 +430,7 @@ func TestGetRemotePeers(t *testing.T) {
for _, testCase := range testCases { for _, testCase := range testCases {
zendpoints := mustGetZoneEndpoints(testCase.endpointArgs...) zendpoints := mustGetZoneEndpoints(testCase.endpointArgs...)
if !zendpoints[0].Endpoints[0].IsLocal { if !zendpoints[0].Endpoints[0].IsLocal {
if err := zendpoints[0].Endpoints.UpdateIsLocal(); err != nil { if err := zendpoints[0].Endpoints.UpdateIsLocal(false); err != nil {
t.Fatalf("error: expected = <nil>, got = %v", err) t.Fatalf("error: expected = <nil>, got = %v", err)
} }
} }

@ -162,8 +162,8 @@ func sortIPs(ipList []string) []string {
func getAPIEndpoints() (apiEndpoints []string) { func getAPIEndpoints() (apiEndpoints []string) {
var ipList []string var ipList []string
if globalMinioHost == "" { if globalMinioHost == "" {
ipList = sortIPs(localIP4.ToSlice()) ipList = sortIPs(mustGetLocalIP4().ToSlice())
ipList = append(ipList, localIP6.ToSlice()...) ipList = append(ipList, mustGetLocalIP6().ToSlice()...)
} else { } else {
ipList = []string{globalMinioHost} ipList = []string{globalMinioHost}
} }
@ -278,8 +278,8 @@ func isLocalHost(host string, port string, localPort string) (bool, error) {
} }
// If intersection of two IP sets is not empty, then the host is localhost. // If intersection of two IP sets is not empty, then the host is localhost.
isLocalv4 := !localIP4.Intersection(hostIPs).IsEmpty() isLocalv4 := !mustGetLocalIP4().Intersection(hostIPs).IsEmpty()
isLocalv6 := !localIP6.Intersection(hostIPs).IsEmpty() isLocalv6 := !mustGetLocalIP6().Intersection(hostIPs).IsEmpty()
if port != "" { if port != "" {
return (isLocalv4 || isLocalv6) && (port == localPort), nil return (isLocalv4 || isLocalv6) && (port == localPort), nil
} }

@ -798,6 +798,10 @@ func (sys *NotificationSys) RemoveRulesMap(bucketName string, rulesMap event.Rul
// ConfiguredTargetIDs - returns list of configured target id's // ConfiguredTargetIDs - returns list of configured target id's
func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID { func (sys *NotificationSys) ConfiguredTargetIDs() []event.TargetID {
if sys == nil {
return nil
}
sys.RLock() sys.RLock()
defer sys.RUnlock() defer sys.RUnlock()

@ -154,6 +154,11 @@ func IsDCOS() bool {
return false return false
} }
// IsKubernetesReplicaSet returns true if minio is running in kubernetes replica set.
func IsKubernetesReplicaSet() bool {
return IsKubernetes() && (env.Get("KUBERNETES_REPLICA_SET", "") != "")
}
// IsKubernetes returns true if minio is running in kubernetes. // IsKubernetes returns true if minio is running in kubernetes.
func IsKubernetes() bool { func IsKubernetes() bool {
if env.Get("SIMPLE_CI", "") == "" { if env.Get("SIMPLE_CI", "") == "" {

Loading…
Cancel
Save