|
|
|
@ -205,19 +205,21 @@ func (en *eventNotifier) GetBucketListenerConfig(bucket string) []listenerConfig |
|
|
|
|
func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerConfig) error { |
|
|
|
|
en.internal.rwMutex.Lock() |
|
|
|
|
defer en.internal.rwMutex.Unlock() |
|
|
|
|
if lcfg == nil { |
|
|
|
|
if len(lcfg) == 0 { |
|
|
|
|
delete(en.internal.listenerConfigs, bucket) |
|
|
|
|
} else { |
|
|
|
|
en.internal.listenerConfigs[bucket] = lcfg |
|
|
|
|
} |
|
|
|
|
en.internal.targets = make(map[string]*listenerLogger) |
|
|
|
|
for _, lc := range lcfg { |
|
|
|
|
logger, err := newListenerLogger(lc.TopicConfig.TopicARN, |
|
|
|
|
lc.TargetServer) |
|
|
|
|
for _, elcArr := range en.internal.listenerConfigs { |
|
|
|
|
for _, elcElem := range elcArr { |
|
|
|
|
currArn := elcElem.TopicConfig.TopicARN |
|
|
|
|
logger, err := newListenerLogger(currArn, elcElem.TargetServer) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
en.internal.targets[lc.TopicConfig.TopicARN] = logger |
|
|
|
|
en.internal.targets[currArn] = logger |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -443,6 +445,15 @@ func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Remove listener configuration from storage layer. Used when a
|
|
|
|
|
// bucket is deleted.
|
|
|
|
|
func removeListenerConfig(bucket string, obj ObjectLayer) error { |
|
|
|
|
// make the path
|
|
|
|
|
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) |
|
|
|
|
// remove it
|
|
|
|
|
return obj.DeleteObject(minioMetaBucket, lcPath) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// loads all bucket notifications if present.
|
|
|
|
|
func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, map[string][]listenerConfig, error) { |
|
|
|
|
// List buckets to proceed loading all notification configuration.
|
|
|
|
|