Use etcd watch to reload IAM users (#7551)

Currently we used to reload users every five minutes,
regardless of etcd is configured or not. But with etcd
configured we can do this more asynchronously to trigger
a refresh by using the watch API

Fixes #7515
master
Harshavardhana 6 years ago committed by Nitish Tiwari
parent 27ef1262bf
commit 83ca1a8d64
  1. 27
      cmd/config-common.go
  2. 121
      cmd/iam.go

@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"time"
etcd "github.com/coreos/etcd/clientv3"
"github.com/minio/minio/cmd/logger"
@ -105,12 +106,26 @@ func readConfigEtcd(ctx context.Context, client *etcd.Client, configFile string)
// watchConfigEtcd - watches for changes on `configFile` on etcd and loads them.
func watchConfigEtcd(objAPI ObjectLayer, configFile string, loadCfgFn func(ObjectLayer) error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
defer cancel()
for watchResp := range globalEtcdClient.Watch(ctx, configFile) {
for _, event := range watchResp.Events {
if event.IsModify() || event.IsCreate() {
loadCfgFn(objAPI)
for {
watchCh := globalEtcdClient.Watch(context.Background(), iamConfigPrefix)
select {
case <-GlobalServiceDoneCh:
return
case watchResp, ok := <-watchCh:
if !ok {
time.Sleep(1 * time.Second)
continue
}
if err := watchResp.Err(); err != nil {
logger.LogIf(context.Background(), err)
// log and retry.
time.Sleep(1 * time.Second)
continue
}
for _, event := range watchResp.Events {
if event.IsModify() || event.IsCreate() {
loadCfgFn(objAPI)
}
}
}
}

@ -62,6 +62,9 @@ type IAMSys struct {
// Load - loads iam subsystem
func (sys *IAMSys) Load(objAPI ObjectLayer) error {
if globalEtcdClient != nil {
return sys.refreshEtcd()
}
return sys.refresh(objAPI)
}
@ -71,21 +74,52 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
return errInvalidArgument
}
defer func() {
// Refresh IAMSys in background.
go func() {
ticker := time.NewTicker(globalRefreshIAMInterval)
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
sys.refresh(objAPI)
if globalEtcdClient != nil {
defer func() {
go func() {
// Refresh IAMSys with etcd watch.
for {
watchCh := globalEtcdClient.Watch(context.Background(), iamConfigPrefix)
select {
case <-GlobalServiceDoneCh:
return
case watchResp, ok := <-watchCh:
if !ok {
time.Sleep(1 * time.Second)
continue
}
if err := watchResp.Err(); err != nil {
logger.LogIf(context.Background(), err)
// log and retry.
time.Sleep(1 * time.Second)
continue
}
for _, event := range watchResp.Events {
if event.IsModify() || event.IsCreate() || event.Type == etcd.EventTypeDelete {
sys.refreshEtcd()
}
}
}
}
}
}()
}()
} else {
defer func() {
// Refresh IAMSys in background.
go func() {
ticker := time.NewTicker(globalRefreshIAMInterval)
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
sys.refresh(objAPI)
}
}
}()
}()
}()
}
doneCh := make(chan struct{})
defer close(doneCh)
@ -95,6 +129,9 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
if globalEtcdClient != nil {
return sys.refreshEtcd()
}
// Load IAMSys once during boot.
if err := sys.refresh(objAPI); err != nil {
if err == errDiskNotFound ||
@ -697,35 +734,51 @@ func setDefaultCannedPolicies(policies map[string]iampolicy.Policy) {
}
}
func (sys *IAMSys) refreshEtcd() error {
iamUsersMap := make(map[string]auth.Credentials)
iamPolicyMap := make(map[string]string)
iamCannedPolicyMap := make(map[string]iampolicy.Policy)
if err := reloadEtcdPolicies(iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil {
return err
}
if err := reloadEtcdUsers(iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
if err := reloadEtcdUsers(iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
// Sets default canned policies, if none are set.
setDefaultCannedPolicies(iamCannedPolicyMap)
sys.Lock()
defer sys.Unlock()
sys.iamUsersMap = iamUsersMap
sys.iamPolicyMap = iamPolicyMap
sys.iamCannedPolicyMap = iamCannedPolicyMap
return nil
}
// Refresh IAMSys.
func (sys *IAMSys) refresh(objAPI ObjectLayer) error {
iamUsersMap := make(map[string]auth.Credentials)
iamPolicyMap := make(map[string]string)
iamCannedPolicyMap := make(map[string]iampolicy.Policy)
if globalEtcdClient != nil {
if err := reloadEtcdPolicies(iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil {
return err
}
if err := reloadEtcdUsers(iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
if err := reloadEtcdUsers(iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
} else {
if err := reloadPolicies(objAPI, iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil {
return err
}
if err := reloadUsers(objAPI, iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
if err := reloadUsers(objAPI, iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
if err := reloadPolicies(objAPI, iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil {
return err
}
if err := reloadUsers(objAPI, iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
if err := reloadUsers(objAPI, iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil {
return err
}
// Sets default canned policies, if none set.
// Sets default canned policies, if none are set.
setDefaultCannedPolicies(iamCannedPolicyMap)
sys.Lock()

Loading…
Cancel
Save