fix: avoid double ListBuckets() loading object lock (#9031)

master
Harshavardhana 5 years ago committed by GitHub
parent 224b4f13b8
commit dcd63b4146
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 53
      cmd/notification.go
  2. 43
      cmd/object-lock.go
  3. 8
      cmd/server-main.go

@ -688,53 +688,6 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error
return nil return nil
} }
func (sys *NotificationSys) initBucketObjectLockConfig(objAPI ObjectLayer) error {
buckets, err := objAPI.ListBuckets(context.Background())
if err != nil {
return err
}
for _, bucket := range buckets {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
configFile := path.Join(bucketConfigPrefix, bucket.Name, bucketObjectLockEnabledConfigFile)
bucketObjLockData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err == errConfigNotFound {
continue
}
return err
}
if string(bucketObjLockData) != bucketObjectLockEnabledConfig {
// this should never happen
logger.LogIf(ctx, objectlock.ErrMalformedBucketObjectConfig)
continue
}
configFile = path.Join(bucketConfigPrefix, bucket.Name, objectLockConfig)
configData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err == errConfigNotFound {
globalBucketObjectLockConfig.Set(bucket.Name, objectlock.Retention{})
continue
}
return err
}
config, err := objectlock.ParseObjectLockConfig(bytes.NewReader(configData))
if err != nil {
return err
}
retention := objectlock.Retention{}
if config.Rule != nil {
retention = config.ToRetention()
}
globalBucketObjectLockConfig.Set(bucket.Name, retention)
}
return nil
}
// Init - initializes notification system from notification.xml and listener.json of all buckets. // Init - initializes notification system from notification.xml and listener.json of all buckets.
func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error { func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
@ -754,11 +707,7 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
} }
} }
if err := sys.load(buckets, objAPI); err != nil { return sys.load(buckets, objAPI)
return err
}
return sys.initBucketObjectLockConfig(objAPI)
} }
// AddRulesMap - adds rules map for bucket name. // AddRulesMap - adds rules map for bucket name.

@ -17,8 +17,10 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"net/http" "net/http"
"path"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
objectlock "github.com/minio/minio/pkg/bucket/object/lock" objectlock "github.com/minio/minio/pkg/bucket/object/lock"
@ -244,3 +246,44 @@ func checkPutObjectLockAllowed(ctx context.Context, r *http.Request, bucket, obj
} }
return mode, retainDate, legalHold, ErrNone return mode, retainDate, legalHold, ErrNone
} }
func initBucketObjectLockConfig(buckets []BucketInfo, objAPI ObjectLayer) error {
for _, bucket := range buckets {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
configFile := path.Join(bucketConfigPrefix, bucket.Name, bucketObjectLockEnabledConfigFile)
bucketObjLockData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err == errConfigNotFound {
continue
}
return err
}
if string(bucketObjLockData) != bucketObjectLockEnabledConfig {
// this should never happen
logger.LogIf(ctx, objectlock.ErrMalformedBucketObjectConfig)
continue
}
configFile = path.Join(bucketConfigPrefix, bucket.Name, objectLockConfig)
configData, err := readConfig(ctx, objAPI, configFile)
if err != nil {
if err == errConfigNotFound {
globalBucketObjectLockConfig.Set(bucket.Name, objectlock.Retention{})
continue
}
return err
}
config, err := objectlock.ParseObjectLockConfig(bytes.NewReader(configData))
if err != nil {
return err
}
retention := objectlock.Retention{}
if config.Rule != nil {
retention = config.ToRetention()
}
globalBucketObjectLockConfig.Set(bucket.Name, retention)
}
return nil
}

@ -263,7 +263,12 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error)
// Initialize policy system. // Initialize policy system.
if err = globalPolicySys.Init(buckets, newObject); err != nil { if err = globalPolicySys.Init(buckets, newObject); err != nil {
return fmt.Errorf("Unable to initialize policy system; %w", err) return fmt.Errorf("Unable to initialize policy system: %w", err)
}
// Initialize bucket object lock.
if err = initBucketObjectLockConfig(buckets, newObject); err != nil {
return fmt.Errorf("Unable to initialize object lock system: %w", err)
} }
// Initialize lifecycle system. // Initialize lifecycle system.
@ -275,6 +280,7 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error)
if err = globalBucketSSEConfigSys.Init(buckets, newObject); err != nil { if err = globalBucketSSEConfigSys.Init(buckets, newObject); err != nil {
return fmt.Errorf("Unable to initialize bucket encryption subsystem: %w", err) return fmt.Errorf("Unable to initialize bucket encryption subsystem: %w", err)
} }
return nil return nil
} }

Loading…
Cancel
Save