keep bucket metadata fields to be consistent (#9660)

added bonus reload bucket metadata always after
a successful MakeBucket, current we were only
doing it with object locking enabled.
master
Harshavardhana 4 years ago committed by GitHub
parent ea210319ce
commit a546047c95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      cmd/bucket-handlers.go
  2. 5
      cmd/bucket-metadata-sys.go
  3. 58
      cmd/bucket-metadata.go
  4. 54
      cmd/bucket-metadata_gen.go
  5. 1
      cmd/bucket-notification-handlers.go
  6. 1
      cmd/notification.go
  7. 4
      cmd/xl-zones.go

@ -538,9 +538,8 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return return
} }
if objectLockEnabled { // Load updated bucket metadata into memory.
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
}
// Make sure to add Location information here only for bucket // Make sure to add Location information here only for bucket
w.Header().Set(xhttp.Location, w.Header().Set(xhttp.Location,
@ -572,9 +571,8 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
return return
} }
if objectLockEnabled { // Load updated bucket metadata into memory.
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
}
// Make sure to add Location information here only for bucket // Make sure to add Location information here only for bucket
w.Header().Set(xhttp.Location, path.Clean(r.URL.Path)) // Clean any trailing slashes. w.Header().Set(xhttp.Location, path.Clean(r.URL.Path)) // Clean any trailing slashes.

@ -74,6 +74,7 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
} }
if globalIsGateway { if globalIsGateway {
// This code is needed only for gateway implementations.
if configFile == bucketPolicyConfig { if configFile == bucketPolicyConfig {
config, err := policy.ParseConfig(bytes.NewReader(configData), bucket) config, err := policy.ParseConfig(bytes.NewReader(configData), bucket)
if err != nil { if err != nil {
@ -97,7 +98,7 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
case bucketPolicyConfig: case bucketPolicyConfig:
meta.PolicyConfigJSON = configData meta.PolicyConfigJSON = configData
case bucketNotificationConfig: case bucketNotificationConfig:
meta.NotificationXML = configData meta.NotificationConfigXML = configData
case bucketLifecycleConfig: case bucketLifecycleConfig:
meta.LifecycleConfigXML = configData meta.LifecycleConfigXML = configData
case bucketSSEConfig: case bucketSSEConfig:
@ -105,7 +106,7 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
case bucketTaggingConfigFile: case bucketTaggingConfigFile:
meta.TaggingConfigXML = configData meta.TaggingConfigXML = configData
case objectLockConfig: case objectLockConfig:
meta.ObjectLockConfigurationXML = configData meta.ObjectLockConfigXML = configData
case bucketQuotaConfigFile: case bucketQuotaConfigFile:
meta.QuotaConfigJSON = configData meta.QuotaConfigJSON = configData
default: default:

@ -46,7 +46,7 @@ const (
) )
var ( var (
defaultBucketObjectLockConfig = []byte(`<ObjectLockConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>`) enabledBucketObjectLockConfig = []byte(`<ObjectLockConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>`)
) )
//go:generate msgp -file $GOFILE //go:generate msgp -file $GOFILE
@ -57,16 +57,16 @@ var (
// bucketMetadataFormat refers to the format. // bucketMetadataFormat refers to the format.
// bucketMetadataVersion can be used to track a rolling upgrade of a field. // bucketMetadataVersion can be used to track a rolling upgrade of a field.
type BucketMetadata struct { type BucketMetadata struct {
Name string Name string
Created time.Time Created time.Time
LockEnabled bool // legacy not used anymore. LockEnabled bool // legacy not used anymore.
PolicyConfigJSON []byte PolicyConfigJSON []byte
NotificationXML []byte NotificationConfigXML []byte
LifecycleConfigXML []byte LifecycleConfigXML []byte
ObjectLockConfigurationXML []byte ObjectLockConfigXML []byte
EncryptionConfigXML []byte EncryptionConfigXML []byte
TaggingConfigXML []byte TaggingConfigXML []byte
QuotaConfigJSON []byte QuotaConfigJSON []byte
// Unexported fields. Must be updated atomically. // Unexported fields. Must be updated atomically.
policyConfig *policy.Policy policyConfig *policy.Policy
@ -83,6 +83,10 @@ func newBucketMetadata(name string) BucketMetadata {
return BucketMetadata{ return BucketMetadata{
Name: name, Name: name,
Created: UTCNow(), Created: UTCNow(),
notificationConfig: &event.Config{
XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
},
quotaConfig: &madmin.BucketQuota{},
} }
} }
@ -142,15 +146,10 @@ func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLa
b.policyConfig = nil b.policyConfig = nil
} }
if len(b.NotificationXML) != 0 { if len(b.NotificationConfigXML) != 0 {
if err = xml.Unmarshal(b.NotificationXML, b.notificationConfig); err != nil { if err = xml.Unmarshal(b.NotificationConfigXML, b.notificationConfig); err != nil {
return err return err
} }
} else {
b.notificationConfig = &event.Config{
XMLNS: "http://s3.amazonaws.com/doc/2006-03-01/",
}
b.notificationConfig.SetRegion(globalServerRegion)
} }
if len(b.LifecycleConfigXML) != 0 { if len(b.LifecycleConfigXML) != 0 {
@ -180,23 +179,22 @@ func (b *BucketMetadata) parseAllConfigs(ctx context.Context, objectAPI ObjectLa
b.taggingConfig = nil b.taggingConfig = nil
} }
if len(b.ObjectLockConfigurationXML) != 0 { if len(b.ObjectLockConfigXML) != 0 {
b.objectLockConfig, err = objectlock.ParseObjectLockConfig(bytes.NewReader(b.ObjectLockConfigurationXML)) b.objectLockConfig, err = objectlock.ParseObjectLockConfig(bytes.NewReader(b.ObjectLockConfigXML))
if err != nil { if err != nil {
return err return err
} }
} else { } else {
b.objectLockConfig = nil b.objectLockConfig = nil
} }
if len(b.QuotaConfigJSON) != 0 { if len(b.QuotaConfigJSON) != 0 {
qcfg, err := parseBucketQuota(b.Name, b.QuotaConfigJSON) b.quotaConfig, err = parseBucketQuota(b.Name, b.QuotaConfigJSON)
if err != nil { if err != nil {
return err return err
} }
b.quotaConfig = qcfg
} else {
b.quotaConfig = &madmin.BucketQuota{}
} }
return nil return nil
} }
@ -216,9 +214,9 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
// Handle migration from lockEnabled to newer format. // Handle migration from lockEnabled to newer format.
if b.LockEnabled { if b.LockEnabled {
configs[objectLockConfig] = defaultBucketObjectLockConfig configs[objectLockConfig] = enabledBucketObjectLockConfig
b.LockEnabled = false // legacy value unset it b.LockEnabled = false // legacy value unset it
// we are only interested in b.ObjectLockConfigurationXML or objectLockConfig value // we are only interested in b.ObjectLockConfigXML or objectLockConfig value
} }
for _, legacyFile := range legacyConfigs { for _, legacyFile := range legacyConfigs {
@ -245,14 +243,14 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
switch legacyFile { switch legacyFile {
case legacyBucketObjectLockEnabledConfigFile: case legacyBucketObjectLockEnabledConfigFile:
if string(configData) == legacyBucketObjectLockEnabledConfig { if string(configData) == legacyBucketObjectLockEnabledConfig {
b.ObjectLockConfigurationXML = defaultBucketObjectLockConfig b.ObjectLockConfigXML = enabledBucketObjectLockConfig
b.LockEnabled = false // legacy value unset it b.LockEnabled = false // legacy value unset it
// we are only interested in b.ObjectLockConfigurationXML // we are only interested in b.ObjectLockConfigXML
} }
case bucketPolicyConfig: case bucketPolicyConfig:
b.PolicyConfigJSON = configData b.PolicyConfigJSON = configData
case bucketNotificationConfig: case bucketNotificationConfig:
b.NotificationXML = configData b.NotificationConfigXML = configData
case bucketLifecycleConfig: case bucketLifecycleConfig:
b.LifecycleConfigXML = configData b.LifecycleConfigXML = configData
case bucketSSEConfig: case bucketSSEConfig:
@ -260,7 +258,7 @@ func (b *BucketMetadata) convertLegacyConfigs(ctx context.Context, objectAPI Obj
case bucketTaggingConfigFile: case bucketTaggingConfigFile:
b.TaggingConfigXML = configData b.TaggingConfigXML = configData
case objectLockConfig: case objectLockConfig:
b.ObjectLockConfigurationXML = configData b.ObjectLockConfigXML = configData
case bucketQuotaConfigFile: case bucketQuotaConfigFile:
b.QuotaConfigJSON = configData b.QuotaConfigJSON = configData
} }

@ -48,10 +48,10 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "PolicyConfigJSON") err = msgp.WrapError(err, "PolicyConfigJSON")
return return
} }
case "NotificationXML": case "NotificationConfigXML":
z.NotificationXML, err = dc.ReadBytes(z.NotificationXML) z.NotificationConfigXML, err = dc.ReadBytes(z.NotificationConfigXML)
if err != nil { if err != nil {
err = msgp.WrapError(err, "NotificationXML") err = msgp.WrapError(err, "NotificationConfigXML")
return return
} }
case "LifecycleConfigXML": case "LifecycleConfigXML":
@ -60,10 +60,10 @@ func (z *BucketMetadata) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "LifecycleConfigXML") err = msgp.WrapError(err, "LifecycleConfigXML")
return return
} }
case "ObjectLockConfigurationXML": case "ObjectLockConfigXML":
z.ObjectLockConfigurationXML, err = dc.ReadBytes(z.ObjectLockConfigurationXML) z.ObjectLockConfigXML, err = dc.ReadBytes(z.ObjectLockConfigXML)
if err != nil { if err != nil {
err = msgp.WrapError(err, "ObjectLockConfigurationXML") err = msgp.WrapError(err, "ObjectLockConfigXML")
return return
} }
case "EncryptionConfigXML": case "EncryptionConfigXML":
@ -138,14 +138,14 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "PolicyConfigJSON") err = msgp.WrapError(err, "PolicyConfigJSON")
return return
} }
// write "NotificationXML" // write "NotificationConfigXML"
err = en.Append(0xaf, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x58, 0x4d, 0x4c) err = en.Append(0xb5, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
if err != nil { if err != nil {
return return
} }
err = en.WriteBytes(z.NotificationXML) err = en.WriteBytes(z.NotificationConfigXML)
if err != nil { if err != nil {
err = msgp.WrapError(err, "NotificationXML") err = msgp.WrapError(err, "NotificationConfigXML")
return return
} }
// write "LifecycleConfigXML" // write "LifecycleConfigXML"
@ -158,14 +158,14 @@ func (z *BucketMetadata) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "LifecycleConfigXML") err = msgp.WrapError(err, "LifecycleConfigXML")
return return
} }
// write "ObjectLockConfigurationXML" // write "ObjectLockConfigXML"
err = en.Append(0xba, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x58, 0x4d, 0x4c) err = en.Append(0xb3, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
if err != nil { if err != nil {
return return
} }
err = en.WriteBytes(z.ObjectLockConfigurationXML) err = en.WriteBytes(z.ObjectLockConfigXML)
if err != nil { if err != nil {
err = msgp.WrapError(err, "ObjectLockConfigurationXML") err = msgp.WrapError(err, "ObjectLockConfigXML")
return return
} }
// write "EncryptionConfigXML" // write "EncryptionConfigXML"
@ -217,15 +217,15 @@ func (z *BucketMetadata) MarshalMsg(b []byte) (o []byte, err error) {
// string "PolicyConfigJSON" // string "PolicyConfigJSON"
o = append(o, 0xb0, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4a, 0x53, 0x4f, 0x4e) o = append(o, 0xb0, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4a, 0x53, 0x4f, 0x4e)
o = msgp.AppendBytes(o, z.PolicyConfigJSON) o = msgp.AppendBytes(o, z.PolicyConfigJSON)
// string "NotificationXML" // string "NotificationConfigXML"
o = append(o, 0xaf, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x58, 0x4d, 0x4c) o = append(o, 0xb5, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
o = msgp.AppendBytes(o, z.NotificationXML) o = msgp.AppendBytes(o, z.NotificationConfigXML)
// string "LifecycleConfigXML" // string "LifecycleConfigXML"
o = append(o, 0xb2, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c) o = append(o, 0xb2, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
o = msgp.AppendBytes(o, z.LifecycleConfigXML) o = msgp.AppendBytes(o, z.LifecycleConfigXML)
// string "ObjectLockConfigurationXML" // string "ObjectLockConfigXML"
o = append(o, 0xba, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x58, 0x4d, 0x4c) o = append(o, 0xb3, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x4c, 0x6f, 0x63, 0x6b, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
o = msgp.AppendBytes(o, z.ObjectLockConfigurationXML) o = msgp.AppendBytes(o, z.ObjectLockConfigXML)
// string "EncryptionConfigXML" // string "EncryptionConfigXML"
o = append(o, 0xb3, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c) o = append(o, 0xb3, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x58, 0x4d, 0x4c)
o = msgp.AppendBytes(o, z.EncryptionConfigXML) o = msgp.AppendBytes(o, z.EncryptionConfigXML)
@ -280,10 +280,10 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "PolicyConfigJSON") err = msgp.WrapError(err, "PolicyConfigJSON")
return return
} }
case "NotificationXML": case "NotificationConfigXML":
z.NotificationXML, bts, err = msgp.ReadBytesBytes(bts, z.NotificationXML) z.NotificationConfigXML, bts, err = msgp.ReadBytesBytes(bts, z.NotificationConfigXML)
if err != nil { if err != nil {
err = msgp.WrapError(err, "NotificationXML") err = msgp.WrapError(err, "NotificationConfigXML")
return return
} }
case "LifecycleConfigXML": case "LifecycleConfigXML":
@ -292,10 +292,10 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "LifecycleConfigXML") err = msgp.WrapError(err, "LifecycleConfigXML")
return return
} }
case "ObjectLockConfigurationXML": case "ObjectLockConfigXML":
z.ObjectLockConfigurationXML, bts, err = msgp.ReadBytesBytes(bts, z.ObjectLockConfigurationXML) z.ObjectLockConfigXML, bts, err = msgp.ReadBytesBytes(bts, z.ObjectLockConfigXML)
if err != nil { if err != nil {
err = msgp.WrapError(err, "ObjectLockConfigurationXML") err = msgp.WrapError(err, "ObjectLockConfigXML")
return return
} }
case "EncryptionConfigXML": case "EncryptionConfigXML":
@ -330,6 +330,6 @@ func (z *BucketMetadata) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketMetadata) Msgsize() (s int) { func (z *BucketMetadata) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 16 + msgp.BytesPrefixSize + len(z.NotificationXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 27 + msgp.BytesPrefixSize + len(z.ObjectLockConfigurationXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON) s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 8 + msgp.TimeSize + 12 + msgp.BoolSize + 17 + msgp.BytesPrefixSize + len(z.PolicyConfigJSON) + 22 + msgp.BytesPrefixSize + len(z.NotificationConfigXML) + 19 + msgp.BytesPrefixSize + len(z.LifecycleConfigXML) + 20 + msgp.BytesPrefixSize + len(z.ObjectLockConfigXML) + 20 + msgp.BytesPrefixSize + len(z.EncryptionConfigXML) + 17 + msgp.BytesPrefixSize + len(z.TaggingConfigXML) + 16 + msgp.BytesPrefixSize + len(z.QuotaConfigJSON)
return return
} }

@ -74,6 +74,7 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter,
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return return
} }
config.SetRegion(globalServerRegion)
if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil { if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil {
arnErr, ok := err.(*event.ErrARNNotFound) arnErr, ok := err.(*event.ErrARNNotFound)
if ok { if ok {

@ -616,6 +616,7 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error
if err != nil { if err != nil {
return err return err
} }
config.SetRegion(globalServerRegion)
if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil { if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil {
if _, ok := err.(*event.ErrARNNotFound); !ok { if _, ok := err.(*event.ErrARNNotFound); !ok {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)

@ -325,7 +325,7 @@ func (z *xlZones) MakeBucketWithLocation(ctx context.Context, bucket, location s
// If it doesn't exist we get a new, so ignore errors // If it doesn't exist we get a new, so ignore errors
meta := newBucketMetadata(bucket) meta := newBucketMetadata(bucket)
if lockEnabled { if lockEnabled {
meta.ObjectLockConfigurationXML = defaultBucketObjectLockConfig meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
} }
if err := meta.Save(ctx, z); err != nil { if err := meta.Save(ctx, z); err != nil {
return toObjectErr(err, bucket) return toObjectErr(err, bucket)
@ -355,7 +355,7 @@ func (z *xlZones) MakeBucketWithLocation(ctx context.Context, bucket, location s
// If it doesn't exist we get a new, so ignore errors // If it doesn't exist we get a new, so ignore errors
meta := newBucketMetadata(bucket) meta := newBucketMetadata(bucket)
if lockEnabled { if lockEnabled {
meta.ObjectLockConfigurationXML = defaultBucketObjectLockConfig meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
} }
if err := meta.Save(ctx, z); err != nil { if err := meta.Save(ctx, z); err != nil {
return toObjectErr(err, bucket) return toObjectErr(err, bucket)

Loading…
Cancel
Save