|
|
|
@ -19,7 +19,6 @@ package cmd |
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"encoding/xml" |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"net/url" |
|
|
|
|
"path" |
|
|
|
@ -112,7 +111,7 @@ func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []Notificat |
|
|
|
|
en.rwMutex.Lock() |
|
|
|
|
defer en.rwMutex.Unlock() |
|
|
|
|
if listenerCh == nil { |
|
|
|
|
return errors.New("invalid argument") |
|
|
|
|
return errInvalidArgument |
|
|
|
|
} |
|
|
|
|
en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh) |
|
|
|
|
return nil |
|
|
|
@ -161,7 +160,7 @@ func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notification |
|
|
|
|
en.rwMutex.Lock() |
|
|
|
|
defer en.rwMutex.Unlock() |
|
|
|
|
if notificationCfg == nil { |
|
|
|
|
return errors.New("invalid argument") |
|
|
|
|
return errInvalidArgument |
|
|
|
|
} |
|
|
|
|
en.notificationConfigs[bucket] = notificationCfg |
|
|
|
|
return nil |
|
|
|
@ -178,8 +177,11 @@ func eventNotify(event eventData) { |
|
|
|
|
// - s3:ObjectCreated:CompleteMultipartUpload
|
|
|
|
|
// - s3:ObjectRemoved:Delete
|
|
|
|
|
|
|
|
|
|
nConfig := eventN.GetBucketNotificationConfig(event.Bucket) |
|
|
|
|
nConfig := globalEventNotifier.GetBucketNotificationConfig(event.Bucket) |
|
|
|
|
// No bucket notifications enabled, drop the event notification.
|
|
|
|
|
if nConfig == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -198,7 +200,7 @@ func eventNotify(event eventData) { |
|
|
|
|
eventMatch := eventMatch(eventType, qConfig.Events) |
|
|
|
|
ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules) |
|
|
|
|
if eventMatch && ruleMatch { |
|
|
|
|
targetLog := eventN.GetQueueTarget(qConfig.QueueARN) |
|
|
|
|
targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN) |
|
|
|
|
if targetLog != nil { |
|
|
|
|
targetLog.WithFields(logrus.Fields{ |
|
|
|
|
"Records": notificationEvent, |
|
|
|
@ -211,7 +213,7 @@ func eventNotify(event eventData) { |
|
|
|
|
ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules) |
|
|
|
|
eventMatch := eventMatch(eventType, topicConfig.Events) |
|
|
|
|
if eventMatch && ruleMatch { |
|
|
|
|
targetListeners := eventN.GetSNSTarget(topicConfig.TopicARN) |
|
|
|
|
targetListeners := globalEventNotifier.GetSNSTarget(topicConfig.TopicARN) |
|
|
|
|
for _, listener := range targetListeners { |
|
|
|
|
listener <- notificationEvent |
|
|
|
|
} |
|
|
|
@ -352,7 +354,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Global instance of event notification queue.
|
|
|
|
|
var eventN *eventNotifier |
|
|
|
|
var globalEventNotifier *eventNotifier |
|
|
|
|
|
|
|
|
|
// Initialize event notifier.
|
|
|
|
|
func initEventNotifier(objAPI ObjectLayer) error { |
|
|
|
@ -373,7 +375,7 @@ func initEventNotifier(objAPI ObjectLayer) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Inititalize event notifier queue.
|
|
|
|
|
eventN = &eventNotifier{ |
|
|
|
|
globalEventNotifier = &eventNotifier{ |
|
|
|
|
rwMutex: &sync.RWMutex{}, |
|
|
|
|
notificationConfigs: configs, |
|
|
|
|
queueTargets: queueTargets, |
|
|
|
|