List buckets only once per sub-system initialization (#8333)

Current master repeatedly calls ListBuckets() during
initialization of multiple sub-systems

Use single ListBuckets() call for each sub-system as
follows

- LifeCycle
- Policy
- Notification
master
Harshavardhana 5 years ago committed by kannappanr
parent fb1374f2f7
commit 8b80eca184
  1. 9
      cmd/admin-handlers_test.go
  2. 8
      cmd/gateway-main.go
  3. 5
      cmd/gateway-unsupported.go
  4. 67
      cmd/lifecycle.go
  5. 16
      cmd/notification.go
  6. 32
      cmd/policy.go
  7. 18
      cmd/server-main.go
  8. 18
      cmd/test-utils_test.go

@ -294,11 +294,16 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) {
globalIAMSys = NewIAMSys() globalIAMSys = NewIAMSys()
globalIAMSys.Init(objLayer) globalIAMSys.Init(objLayer)
buckets, err := objLayer.ListBuckets(context.Background())
if err != nil {
return nil, err
}
globalPolicySys = NewPolicySys() globalPolicySys = NewPolicySys()
globalPolicySys.Init(objLayer) globalPolicySys.Init(buckets, objLayer)
globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints)
globalNotificationSys.Init(objLayer) globalNotificationSys.Init(buckets, objLayer)
// Setup admin mgmt REST API handlers. // Setup admin mgmt REST API handlers.
adminRouter := mux.NewRouter() adminRouter := mux.NewRouter()

@ -274,17 +274,11 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
// Create new policy system. // Create new policy system.
globalPolicySys = NewPolicySys() globalPolicySys = NewPolicySys()
// Initialize policy system. // Create new lifecycle system.
go globalPolicySys.Init(newObject)
// Create new lifecycle system
globalLifecycleSys = NewLifecycleSys() globalLifecycleSys = NewLifecycleSys()
// Create new notification system. // Create new notification system.
globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints)
if enableConfigOps && newObject.IsNotificationSupported() {
logger.LogIf(context.Background(), globalNotificationSys.Init(newObject))
}
// Verify if object layer supports // Verify if object layer supports
// - encryption // - encryption

@ -144,11 +144,6 @@ func (a GatewayUnsupported) CopyObject(ctx context.Context, srcBucket string, sr
return objInfo, NotImplemented{} return objInfo, NotImplemented{}
} }
// RefreshBucketPolicy refreshes cache policy with what's on disk.
func (a GatewayUnsupported) RefreshBucketPolicy(ctx context.Context, bucket string) error {
return NotImplemented{}
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer. // IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (a GatewayUnsupported) IsNotificationSupported() bool { func (a GatewayUnsupported) IsNotificationSupported() bool {
return false return false

@ -24,9 +24,7 @@ import (
"path" "path"
"strings" "strings"
"sync" "sync"
"time"
"github.com/minio/minio-go/pkg/set"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lifecycle" "github.com/minio/minio/pkg/lifecycle"
) )
@ -45,6 +43,11 @@ type LifecycleSys struct {
// Set - sets lifecycle config to given bucket name. // Set - sets lifecycle config to given bucket name.
func (sys *LifecycleSys) Set(bucketName string, lifecycle lifecycle.Lifecycle) { func (sys *LifecycleSys) Set(bucketName string, lifecycle lifecycle.Lifecycle) {
if globalIsGateway {
// no-op
return
}
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
@ -53,6 +56,16 @@ func (sys *LifecycleSys) Set(bucketName string, lifecycle lifecycle.Lifecycle) {
// Get - gets lifecycle config associated to a given bucket name. // Get - gets lifecycle config associated to a given bucket name.
func (sys *LifecycleSys) Get(bucketName string) (lifecycle lifecycle.Lifecycle, ok bool) { func (sys *LifecycleSys) Get(bucketName string) (lifecycle lifecycle.Lifecycle, ok bool) {
if globalIsGateway {
// When gateway is enabled, no cached value
// is used to validate life cycle policies.
objAPI := newObjectLayerFn()
if objAPI == nil {
return
}
l, err := objAPI.GetBucketLifecycle(context.Background(), bucketName)
return *l, err == nil
}
sys.Lock() sys.Lock()
defer sys.Unlock() defer sys.Unlock()
@ -107,26 +120,16 @@ func NewLifecycleSys() *LifecycleSys {
} }
// Init - initializes lifecycle system from lifecycle.xml of all buckets. // Init - initializes lifecycle system from lifecycle.xml of all buckets.
func (sys *LifecycleSys) Init(objAPI ObjectLayer) error { func (sys *LifecycleSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
return errServerNotInitialized return errServerNotInitialized
} }
defer func() { // In gateway mode, we always fetch the bucket lifecycle configuration from the gateway backend.
// Refresh LifecycleSys in background. // So, this is a no-op for gateway servers.
go func() { if globalIsGateway {
ticker := time.NewTicker(globalRefreshBucketLifecycleInterval) return nil
defer ticker.Stop()
for {
select {
case <-GlobalServiceDoneCh:
return
case <-ticker.C:
sys.refresh(objAPI)
}
} }
}()
}()
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -140,7 +143,7 @@ func (sys *LifecycleSys) Init(objAPI ObjectLayer) error {
select { select {
case <-retryTimerCh: case <-retryTimerCh:
// Load LifecycleSys once during boot. // Load LifecycleSys once during boot.
if err := sys.refresh(objAPI); err != nil { if err := sys.load(buckets, objAPI); err != nil {
if err == errDiskNotFound || if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
@ -156,14 +159,8 @@ func (sys *LifecycleSys) Init(objAPI ObjectLayer) error {
} }
} }
// Refresh LifecycleSys. // Loads lifecycle policies for all buckets into LifecycleSys.
func (sys *LifecycleSys) refresh(objAPI ObjectLayer) error { func (sys *LifecycleSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
buckets, err := objAPI.ListBuckets(context.Background())
if err != nil {
logger.LogIf(context.Background(), err)
return err
}
sys.removeDeletedBuckets(buckets)
for _, bucket := range buckets { for _, bucket := range buckets {
config, err := objAPI.GetBucketLifecycle(context.Background(), bucket.Name) config, err := objAPI.GetBucketLifecycle(context.Background(), bucket.Name)
if err != nil { if err != nil {
@ -179,24 +176,6 @@ func (sys *LifecycleSys) refresh(objAPI ObjectLayer) error {
return nil return nil
} }
// removeDeletedBuckets - to handle a corner case where we have cached the lifecycle for a deleted
// bucket. i.e if we miss a delete-bucket notification we should delete the corresponding
// bucket policy during sys.refresh()
func (sys *LifecycleSys) removeDeletedBuckets(bucketInfos []BucketInfo) {
buckets := set.NewStringSet()
for _, info := range bucketInfos {
buckets.Add(info.Name)
}
sys.Lock()
defer sys.Unlock()
for bucket := range sys.bucketLifecycleMap {
if !buckets.Contains(bucket) {
delete(sys.bucketLifecycleMap, bucket)
}
}
}
// Remove - removes policy for given bucket name. // Remove - removes policy for given bucket name.
func (sys *LifecycleSys) Remove(bucketName string) { func (sys *LifecycleSys) Remove(bucketName string) {
sys.Lock() sys.Lock()

@ -770,11 +770,8 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
return nil return nil
} }
func (sys *NotificationSys) refresh(objAPI ObjectLayer) error { // Loads notification policies for all buckets into NotificationSys.
buckets, err := objAPI.ListBuckets(context.Background()) func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
if err != nil {
return err
}
for _, bucket := range buckets { for _, bucket := range buckets {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name}) ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
config, err := readNotificationConfig(ctx, objAPI, bucket.Name) config, err := readNotificationConfig(ctx, objAPI, bucket.Name)
@ -796,11 +793,16 @@ func (sys *NotificationSys) refresh(objAPI ObjectLayer) error {
} }
// Init - initializes notification system from notification.xml and listener.json of all buckets. // Init - initializes notification system from notification.xml and listener.json of all buckets.
func (sys *NotificationSys) Init(objAPI ObjectLayer) error { func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
return errInvalidArgument return errInvalidArgument
} }
// In gateway mode, notifications are not supported.
if globalIsGateway {
return nil
}
doneCh := make(chan struct{}) doneCh := make(chan struct{})
defer close(doneCh) defer close(doneCh)
@ -812,7 +814,7 @@ func (sys *NotificationSys) Init(objAPI ObjectLayer) error {
for { for {
select { select {
case <-retryTimerCh: case <-retryTimerCh:
if err := sys.refresh(objAPI); err != nil { if err := sys.load(buckets, objAPI); err != nil {
if err == errDiskNotFound || if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {

@ -27,7 +27,6 @@ import (
"sync" "sync"
miniogopolicy "github.com/minio/minio-go/v6/pkg/policy" miniogopolicy "github.com/minio/minio-go/v6/pkg/policy"
"github.com/minio/minio-go/v6/pkg/set"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/handlers" "github.com/minio/minio/pkg/handlers"
@ -40,24 +39,6 @@ type PolicySys struct {
bucketPolicyMap map[string]policy.Policy bucketPolicyMap map[string]policy.Policy
} }
// removeDeletedBuckets - to handle a corner case where we have cached the policy for a deleted
// bucket. i.e if we miss a delete-bucket notification we should delete the corresponding
// bucket policy during sys.refresh()
func (sys *PolicySys) removeDeletedBuckets(bucketInfos []BucketInfo) {
buckets := set.NewStringSet()
for _, info := range bucketInfos {
buckets.Add(info.Name)
}
sys.Lock()
defer sys.Unlock()
for bucket := range sys.bucketPolicyMap {
if !buckets.Contains(bucket) {
delete(sys.bucketPolicyMap, bucket)
}
}
}
// Set - sets policy to given bucket name. If policy is empty, existing policy is removed. // Set - sets policy to given bucket name. If policy is empty, existing policy is removed.
func (sys *PolicySys) Set(bucketName string, policy policy.Policy) { func (sys *PolicySys) Set(bucketName string, policy policy.Policy) {
if globalIsGateway { if globalIsGateway {
@ -110,13 +91,8 @@ func (sys *PolicySys) IsAllowed(args policy.Args) bool {
return args.IsOwner return args.IsOwner
} }
// Refresh PolicySys. // Loads policies for all buckets into PolicySys.
func (sys *PolicySys) refresh(objAPI ObjectLayer) error { func (sys *PolicySys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
buckets, err := objAPI.ListBuckets(context.Background())
if err != nil {
return err
}
sys.removeDeletedBuckets(buckets)
for _, bucket := range buckets { for _, bucket := range buckets {
config, err := objAPI.GetBucketPolicy(context.Background(), bucket.Name) config, err := objAPI.GetBucketPolicy(context.Background(), bucket.Name)
if err != nil { if err != nil {
@ -145,7 +121,7 @@ func (sys *PolicySys) refresh(objAPI ObjectLayer) error {
} }
// Init - initializes policy system from policy.json of all buckets. // Init - initializes policy system from policy.json of all buckets.
func (sys *PolicySys) Init(objAPI ObjectLayer) error { func (sys *PolicySys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
return errInvalidArgument return errInvalidArgument
} }
@ -168,7 +144,7 @@ func (sys *PolicySys) Init(objAPI ObjectLayer) error {
select { select {
case <-retryTimerCh: case <-retryTimerCh:
// Load PolicySys once during boot. // Load PolicySys once during boot.
if err := sys.refresh(objAPI); err != nil { if err := sys.load(buckets, objAPI); err != nil {
if err == errDiskNotFound || if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) || strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) { strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {

@ -234,6 +234,10 @@ func serverMain(ctx *cli.Context) {
checkUpdate(getMinioMode()) checkUpdate(getMinioMode())
} }
if globalIsDiskCacheEnabled {
logger.StartupMessage(colorRed(colorBold("Disk caching is allowed only for gateway deployments")))
}
// FIXME: This code should be removed in future releases and we should have mandatory // FIXME: This code should be removed in future releases and we should have mandatory
// check for ENVs credentials under distributed setup. Until all users migrate we // check for ENVs credentials under distributed setup. Until all users migrate we
// are intentionally providing backward compatibility. // are intentionally providing backward compatibility.
@ -339,22 +343,24 @@ func serverMain(ctx *cli.Context) {
logger.Fatal(err, "Unable to initialize IAM system") logger.Fatal(err, "Unable to initialize IAM system")
} }
buckets, err := newObject.ListBuckets(context.Background())
if err != nil {
logger.Fatal(err, "Unable to list buckets on your backend")
}
// Create new policy system. // Create new policy system.
globalPolicySys = NewPolicySys() globalPolicySys = NewPolicySys()
// Initialize policy system. // Initialize policy system.
if err = globalPolicySys.Init(newObject); err != nil { if err = globalPolicySys.Init(buckets, newObject); err != nil {
logger.Fatal(err, "Unable to initialize policy system") logger.Fatal(err, "Unable to initialize policy system")
} }
if globalIsDiskCacheEnabled {
logger.StartupMessage(colorRed(colorBold("Disk caching is allowed only for gateway deployments")))
}
// Create new lifecycle system. // Create new lifecycle system.
globalLifecycleSys = NewLifecycleSys() globalLifecycleSys = NewLifecycleSys()
// Initialize lifecycle system. // Initialize lifecycle system.
if err = globalLifecycleSys.Init(newObject); err != nil { if err = globalLifecycleSys.Init(buckets, newObject); err != nil {
logger.Fatal(err, "Unable to initialize lifecycle system") logger.Fatal(err, "Unable to initialize lifecycle system")
} }
@ -362,7 +368,7 @@ func serverMain(ctx *cli.Context) {
globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints) globalNotificationSys = NewNotificationSys(globalServerConfig, globalEndpoints)
// Initialize notification system. // Initialize notification system.
if err = globalNotificationSys.Init(newObject); err != nil { if err = globalNotificationSys.Init(buckets, newObject); err != nil {
logger.Fatal(err, "Unable to initialize notification system") logger.Fatal(err, "Unable to initialize notification system")
} }

@ -359,14 +359,19 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
globalIAMSys = NewIAMSys() globalIAMSys = NewIAMSys()
globalIAMSys.Init(objLayer) globalIAMSys.Init(objLayer)
buckets, err := objLayer.ListBuckets(context.Background())
if err != nil {
t.Fatalf("Unable to list buckets on backend %s", err)
}
globalPolicySys = NewPolicySys() globalPolicySys = NewPolicySys()
globalPolicySys.Init(objLayer) globalPolicySys.Init(buckets, objLayer)
globalNotificationSys = NewNotificationSys(globalServerConfig, testServer.Disks) globalNotificationSys = NewNotificationSys(globalServerConfig, testServer.Disks)
globalNotificationSys.Init(objLayer) globalNotificationSys.Init(buckets, objLayer)
globalLifecycleSys = NewLifecycleSys() globalLifecycleSys = NewLifecycleSys()
globalLifecycleSys.Init(objLayer) globalLifecycleSys.Init(buckets, objLayer)
return testServer return testServer
} }
@ -1943,8 +1948,13 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
globalIAMSys = NewIAMSys() globalIAMSys = NewIAMSys()
globalIAMSys.Init(objLayer) globalIAMSys.Init(objLayer)
buckets, err := objLayer.ListBuckets(context.Background())
if err != nil {
t.Fatalf("Unable to list buckets on backend %s", err)
}
globalPolicySys = NewPolicySys() globalPolicySys = NewPolicySys()
globalPolicySys.Init(objLayer) globalPolicySys.Init(buckets, objLayer)
credentials := globalServerConfig.GetCredential() credentials := globalServerConfig.GetCredential()

Loading…
Cancel
Save