From ac07df2985b3b362711ca12270cd3a85f78a37f8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 8 Apr 2020 19:00:39 -0700 Subject: [PATCH] start watcher after all creds have been loaded (#9301) start watcher after all creds have been loaded to avoid any conflicting locks that might get deadlocked. Deprecate unused peer calls for LoadUsers() --- buildscripts/race.sh | 2 +- cmd/iam-etcd-store.go | 59 +++++++++++++++++++---------------------- cmd/iam-object-store.go | 18 +++++-------- cmd/iam.go | 9 +------ cmd/notification.go | 13 --------- cmd/peer-rest-client.go | 10 ------- cmd/peer-rest-server.go | 28 ------------------- 7 files changed, 37 insertions(+), 102 deletions(-) diff --git a/buildscripts/race.sh b/buildscripts/race.sh index 12f03c3b2..66ee3de4a 100755 --- a/buildscripts/race.sh +++ b/buildscripts/race.sh @@ -3,5 +3,5 @@ set -e for d in $(go list ./... | grep -v browser); do - CGO_ENABLED=1 go test -v -race --timeout 20m "$d" + CGO_ENABLED=1 go test -v -race --timeout 50m "$d" done diff --git a/cmd/iam-etcd-store.go b/cmd/iam-etcd-store.go index 571f93942..5e0723ded 100644 --- a/cmd/iam-etcd-store.go +++ b/cmd/iam-etcd-store.go @@ -559,42 +559,39 @@ func (ies *IAMEtcdStore) deleteGroupInfo(name string) error { } func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) { - watchEtcd := func() { + for { + outerLoop: + // Refresh IAMSys with etcd watch. + watchCh := ies.client.Watch(ctx, + iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly()) + for { - outerLoop: - // Refresh IAMSys with etcd watch. - watchCh := ies.client.Watch(ctx, - iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly()) - - for { - select { - case <-ctx.Done(): - return - case watchResp, ok := <-watchCh: - if !ok { - time.Sleep(1 * time.Second) - // Upon an error on watch channel - // re-init the watch channel. - goto outerLoop - } - if err := watchResp.Err(); err != nil { - logger.LogIf(ctx, err) - // log and retry. - time.Sleep(1 * time.Second) - // Upon an error on watch channel - // re-init the watch channel. - goto outerLoop - } - for _, event := range watchResp.Events { - ies.lock() - ies.reloadFromEvent(sys, event) - ies.unlock() - } + select { + case <-ctx.Done(): + return + case watchResp, ok := <-watchCh: + if !ok { + time.Sleep(1 * time.Second) + // Upon an error on watch channel + // re-init the watch channel. + goto outerLoop + } + if err := watchResp.Err(); err != nil { + logger.LogIf(ctx, err) + // log and retry. + time.Sleep(1 * time.Second) + // Upon an error on watch channel + // re-init the watch channel. + goto outerLoop + } + for _, event := range watchResp.Events { + ies.lock() + ies.reloadFromEvent(sys, event) + ies.unlock() } } } } - go watchEtcd() } // sys.RLock is held by caller. diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 05348480e..f7efe7a6a 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -590,17 +590,13 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri } func (iamOS *IAMObjectStore) watch(ctx context.Context, sys *IAMSys) { - watchDisk := func() { - for { - select { - case <-ctx.Done(): - return - case <-time.NewTimer(globalRefreshIAMInterval).C: - logger.LogIf(ctx, iamOS.loadAll(ctx, sys)) - } + // Refresh IAMSys. + for { + select { + case <-ctx.Done(): + return + case <-time.NewTimer(globalRefreshIAMInterval).C: + logger.LogIf(ctx, iamOS.loadAll(ctx, sys)) } } - - // Refresh IAMSys in background. - go watchDisk() } diff --git a/cmd/iam.go b/cmd/iam.go index 0880d7bcc..63b0489bd 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -353,13 +353,6 @@ func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, userType IAMUs return nil } -// Load - loads iam subsystem -func (sys *IAMSys) Load(ctx context.Context) error { - // Pass nil objectlayer here - it will be loaded internally - // from the IAMStorageAPI. - return sys.store.loadAll(ctx, sys) -} - // Perform IAM configuration migration. func (sys *IAMSys) doIAMConfigMigration(ctx context.Context) error { return sys.store.migrateBackendFormat(ctx) @@ -386,12 +379,12 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error { return err } - sys.store.watch(ctx, sys) err := sys.store.loadAll(ctx, sys) // Invalidate the old cred after finishing IAM initialization globalOldCred = auth.Credentials{} + go sys.store.watch(ctx, sys) return err } diff --git a/cmd/notification.go b/cmd/notification.go index e95e31c0c..0cf458068 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -225,19 +225,6 @@ func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []Notification return ng.Wait() } -// LoadUsers - calls LoadUsers RPC call on all peers. -func (sys *NotificationSys) LoadUsers() []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) - for idx, client := range sys.peerClients { - if client == nil { - continue - } - client := client - ng.Go(context.Background(), client.LoadUsers, idx, *client.host) - } - return ng.Wait() -} - // LoadGroup - loads a specific group on all peers. func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index c537836c9..4d1350920 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -874,16 +874,6 @@ func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error) return nil } -// LoadUsers - send load users command to peer nodes. -func (client *peerRESTClient) LoadUsers() (err error) { - respBody, err := client.call(peerRESTMethodLoadUsers, nil, nil, -1) - if err != nil { - return - } - defer http.DrainBody(respBody) - return nil -} - // LoadGroup - send load group command to peers. func (client *peerRESTClient) LoadGroup(group string) error { values := make(url.Values) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 2ba8eeb22..6b55836ec 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -339,33 +339,6 @@ func (s *peerRESTServer) LoadUserHandler(w http.ResponseWriter, r *http.Request) w.(http.Flusher).Flush() } -// LoadUsersHandler - reloads all users and canned policies. -func (s *peerRESTServer) LoadUsersHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("Invalid request")) - return - } - - objAPI := newObjectLayerWithoutSafeModeFn() - if objAPI == nil { - s.writeErrorResponse(w, errServerNotInitialized) - return - } - - if globalIAMSys == nil { - s.writeErrorResponse(w, errServerNotInitialized) - return - } - - err := globalIAMSys.Load(GlobalContext) - if err != nil { - s.writeErrorResponse(w, err) - return - } - - w.(http.Flusher).Flush() -} - // LoadGroupHandler - reloads group along with members list. func (s *peerRESTServer) LoadGroupHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1359,7 +1332,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicyMapping).HandlerFunc(httpTraceAll(server.LoadPolicyMappingHandler)).Queries(restQueries(peerRESTUserOrGroup)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteUser).HandlerFunc(httpTraceAll(server.LoadUserHandler)).Queries(restQueries(peerRESTUser)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadUser).HandlerFunc(httpTraceAll(server.LoadUserHandler)).Queries(restQueries(peerRESTUser, peerRESTUserTemp)...) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadUsers).HandlerFunc(httpTraceAll(server.LoadUsersHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(httpTraceAll(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)