From c71895f225ae3cf933d5bd6596ca9516a0094263 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 30 Jul 2019 18:50:49 -0700 Subject: [PATCH] Listen for PolicyDB events from etcd and fix etcd watch handling (#7992) --- cmd/iam.go | 55 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/cmd/iam.go b/cmd/iam.go index 1929db35e..0e68a7716 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -378,6 +378,8 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) { usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix) stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix) policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix) + policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix) + policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix) ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) @@ -398,6 +400,16 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) { policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)) loadEtcdPolicy(ctx, policyName, sys.iamPolicyDocsMap) + case policyDBUsersPrefix: + policyMapFile := strings.TrimPrefix(string(event.Kv.Key), + iamConfigPolicyDBUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + loadEtcdMappedPolicy(ctx, user, false, sys.iamUserPolicyMap) + case policyDBSTSUsersPrefix: + policyMapFile := strings.TrimPrefix(string(event.Kv.Key), + iamConfigPolicyDBSTSUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + loadEtcdMappedPolicy(ctx, user, true, sys.iamUserPolicyMap) } case eventDelete: switch { @@ -413,6 +425,16 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) { policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)) delete(sys.iamPolicyDocsMap, policyName) + case policyDBUsersPrefix: + policyMapFile := strings.TrimPrefix(string(event.Kv.Key), + iamConfigPolicyDBUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + delete(sys.iamUserPolicyMap, user) + case policyDBSTSUsersPrefix: + policyMapFile := strings.TrimPrefix(string(event.Kv.Key), + iamConfigPolicyDBSTSUsersPrefix) + user := strings.TrimSuffix(policyMapFile, ".json") + delete(sys.iamUserPolicyMap, user) } } } @@ -421,26 +443,27 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) { func (sys *IAMSys) watchIAMEtcd() { watchEtcd := func() { // Refresh IAMSys with etcd watch. + mainLoop: for { watchCh := globalEtcdClient.Watch(context.Background(), iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly()) - select { - case <-GlobalServiceDoneCh: - return - case watchResp, ok := <-watchCh: - if !ok { - time.Sleep(1 * time.Second) - continue - } - if err := watchResp.Err(); err != nil { - logger.LogIf(context.Background(), err) - // log and retry. - time.Sleep(1 * time.Second) - continue - } - for _, event := range watchResp.Events { + for { + select { + case <-GlobalServiceDoneCh: + return + case watchResp, ok := <-watchCh: + if !ok { + goto mainLoop + } + if err := watchResp.Err(); err != nil { + logger.LogIf(context.Background(), err) + // log and retry. + continue + } sys.Lock() - sys.reloadFromEvent(event) + for _, event := range watchResp.Events { + sys.reloadFromEvent(event) + } sys.Unlock() } }