You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
841 lines
24 KiB
841 lines
24 KiB
/*
|
|
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"encoding/xml"
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"path"
|
|
"sync"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/minio/minio/pkg/hash"
|
|
)
|
|
|
|
const (
|
|
minioEventSource = "minio:s3"
|
|
)
|
|
|
|
type externalNotifier struct {
|
|
// Per-bucket notification config. This is updated via
|
|
// PutBucketNotification API.
|
|
notificationConfigs map[string]*notificationConfig
|
|
|
|
// An external target keeps a connection to an external
|
|
// service to which events are to be sent. It is a mapping
|
|
// from an ARN to a log object
|
|
targets map[string]*logrus.Logger
|
|
|
|
rwMutex *sync.RWMutex
|
|
}
|
|
|
|
type internalNotifier struct {
|
|
// per-bucket listener configuration. This is updated
|
|
// when listeners connect or disconnect.
|
|
listenerConfigs map[string][]listenerConfig
|
|
|
|
// An internal target is a peer Minio server, that is
|
|
// connected to a listening client. Here, targets is a map of
|
|
// listener ARN to log object.
|
|
targets map[string]*listenerLogger
|
|
|
|
// Connected listeners is a map of listener ARNs to channels
|
|
// on which the ListenBucket API handler go routine is waiting
|
|
// for events to send to a client.
|
|
connectedListeners map[string]*listenChan
|
|
|
|
rwMutex *sync.RWMutex
|
|
}
|
|
|
|
// Global event notification configuration. This structure has state
|
|
// about configured external notifications, and run-time configuration
|
|
// for listener notifications.
|
|
type eventNotifier struct {
|
|
|
|
// `external` here refers to notification configuration to
|
|
// send events to supported external systems
|
|
external externalNotifier
|
|
|
|
// `internal` refers to notification configuration for live
|
|
// listening clients. Events for a client are send from all
|
|
// servers, internally to a particular server that is
|
|
// connected to the client.
|
|
internal internalNotifier
|
|
}
|
|
|
|
// Represents data to be sent with notification event.
|
|
type eventData struct {
|
|
Type EventName
|
|
Bucket string
|
|
ObjInfo ObjectInfo
|
|
ReqParams map[string]string
|
|
Host string
|
|
Port string
|
|
UserAgent string
|
|
}
|
|
|
|
// New notification event constructs a new notification event message from
|
|
// input request metadata which completed successfully.
|
|
func newNotificationEvent(event eventData) NotificationEvent {
|
|
getResponseOriginEndpointKey := func() string {
|
|
host := globalMinioHost
|
|
if host == "" {
|
|
// FIXME: Send FQDN or hostname of this machine than sending IP address.
|
|
host = localIP4.ToSlice()[0]
|
|
}
|
|
|
|
return fmt.Sprintf("%s://%s:%s", getURLScheme(globalIsSSL), host, globalMinioPort)
|
|
}
|
|
|
|
// Fetch the region.
|
|
region := serverConfig.GetRegion()
|
|
|
|
// Fetch the credentials.
|
|
creds := serverConfig.GetCredential()
|
|
|
|
// Time when Minio finished processing the request.
|
|
eventTime := UTCNow()
|
|
|
|
// Fetch a hexadecimal representation of event time in nano seconds.
|
|
uniqueID := mustGetRequestID(eventTime)
|
|
|
|
/// Construct a new object created event.
|
|
|
|
// Following blocks fills in all the necessary details of s3
|
|
// event message structure.
|
|
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
|
|
nEvent := NotificationEvent{
|
|
EventVersion: eventVersion,
|
|
EventSource: minioEventSource,
|
|
AwsRegion: region,
|
|
EventTime: eventTime.Format(timeFormatAMZ),
|
|
EventName: event.Type.String(),
|
|
UserIdentity: identity{creds.AccessKey},
|
|
RequestParameters: event.ReqParams,
|
|
ResponseElements: map[string]string{
|
|
responseRequestIDKey: uniqueID,
|
|
// Following is a custom response element to indicate
|
|
// event origin server endpoint.
|
|
responseOriginEndpointKey: getResponseOriginEndpointKey(),
|
|
},
|
|
S3: eventMeta{
|
|
SchemaVersion: eventSchemaVersion,
|
|
ConfigurationID: eventConfigID,
|
|
Bucket: bucketMeta{
|
|
Name: event.Bucket,
|
|
OwnerIdentity: identity{creds.AccessKey},
|
|
ARN: bucketARNPrefix + event.Bucket,
|
|
},
|
|
},
|
|
Source: sourceInfo{
|
|
Host: event.Host,
|
|
Port: event.Port,
|
|
UserAgent: event.UserAgent,
|
|
},
|
|
}
|
|
|
|
// Escape the object name. For example "red flower.jpg" becomes "red+flower.jpg".
|
|
escapedObj := url.QueryEscape(event.ObjInfo.Name)
|
|
|
|
// For delete object event type, we do not need to set ETag and Size.
|
|
if event.Type == ObjectRemovedDelete {
|
|
nEvent.S3.Object = objectMeta{
|
|
Key: escapedObj,
|
|
VersionID: "1",
|
|
Sequencer: uniqueID,
|
|
}
|
|
return nEvent
|
|
}
|
|
|
|
// For all other events we should set ETag and Size.
|
|
nEvent.S3.Object = objectMeta{
|
|
Key: escapedObj,
|
|
ETag: event.ObjInfo.ETag,
|
|
Size: event.ObjInfo.Size,
|
|
ContentType: event.ObjInfo.ContentType,
|
|
UserMetadata: event.ObjInfo.UserDefined,
|
|
VersionID: "1",
|
|
Sequencer: uniqueID,
|
|
}
|
|
|
|
// Success.
|
|
return nEvent
|
|
}
|
|
|
|
// Fetch all external targets. This returns a copy of the current map of
|
|
// external notification targets.
|
|
func (en eventNotifier) GetAllExternalTargets() map[string]*logrus.Logger {
|
|
en.external.rwMutex.RLock()
|
|
defer en.external.rwMutex.RUnlock()
|
|
targetsCopy := make(map[string]*logrus.Logger)
|
|
for k, v := range en.external.targets {
|
|
targetsCopy[k] = v
|
|
}
|
|
return targetsCopy
|
|
}
|
|
|
|
// Fetch the external target.
|
|
func (en eventNotifier) GetExternalTarget(queueARN string) *logrus.Logger {
|
|
en.external.rwMutex.RLock()
|
|
defer en.external.rwMutex.RUnlock()
|
|
return en.external.targets[queueARN]
|
|
}
|
|
|
|
func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger {
|
|
en.internal.rwMutex.RLock()
|
|
defer en.internal.rwMutex.RUnlock()
|
|
return en.internal.targets[arn]
|
|
}
|
|
|
|
// Set a new sns target for an input sns ARN.
|
|
func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh *listenChan) error {
|
|
if listenerCh == nil {
|
|
return errInvalidArgument
|
|
}
|
|
en.internal.rwMutex.Lock()
|
|
defer en.internal.rwMutex.Unlock()
|
|
en.internal.connectedListeners[snsARN] = listenerCh
|
|
return nil
|
|
}
|
|
|
|
// Remove sns target for an input sns ARN.
|
|
func (en *eventNotifier) RemoveListenerChan(snsARN string) {
|
|
en.internal.rwMutex.Lock()
|
|
defer en.internal.rwMutex.Unlock()
|
|
if en.internal.connectedListeners != nil {
|
|
delete(en.internal.connectedListeners, snsARN)
|
|
}
|
|
}
|
|
|
|
func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent) error {
|
|
en.internal.rwMutex.Lock()
|
|
defer en.internal.rwMutex.Unlock()
|
|
|
|
listenChan, ok := en.internal.connectedListeners[arn]
|
|
if ok {
|
|
listenChan.sendNotificationEvent(event)
|
|
}
|
|
// If the channel is not present we ignore the event.
|
|
return nil
|
|
}
|
|
|
|
// Fetch bucket notification config for an input bucket.
|
|
func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
|
|
en.external.rwMutex.RLock()
|
|
defer en.external.rwMutex.RUnlock()
|
|
return en.external.notificationConfigs[bucket]
|
|
}
|
|
|
|
func (en *eventNotifier) SetBucketNotificationConfig(bucket string, ncfg *notificationConfig) {
|
|
en.external.rwMutex.Lock()
|
|
if ncfg == nil {
|
|
delete(en.external.notificationConfigs, bucket)
|
|
} else {
|
|
en.external.notificationConfigs[bucket] = ncfg
|
|
}
|
|
en.external.rwMutex.Unlock()
|
|
}
|
|
|
|
func (en *eventNotifier) GetBucketListenerConfig(bucket string) []listenerConfig {
|
|
en.internal.rwMutex.RLock()
|
|
defer en.internal.rwMutex.RUnlock()
|
|
return en.internal.listenerConfigs[bucket]
|
|
}
|
|
|
|
func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerConfig) error {
|
|
en.internal.rwMutex.Lock()
|
|
defer en.internal.rwMutex.Unlock()
|
|
if len(lcfg) == 0 {
|
|
delete(en.internal.listenerConfigs, bucket)
|
|
} else {
|
|
en.internal.listenerConfigs[bucket] = lcfg
|
|
}
|
|
for _, elcArr := range en.internal.listenerConfigs {
|
|
for _, elcElem := range elcArr {
|
|
currArn := elcElem.TopicConfig.TopicARN
|
|
logger, err := newListenerLogger(currArn, elcElem.TargetServer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
en.internal.targets[currArn] = logger
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func eventNotifyForBucketNotifications(eventType, objectName, bucketName string, nEvent []NotificationEvent) {
|
|
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
|
|
if nConfig == nil {
|
|
return
|
|
}
|
|
// Validate if the event and object match the queue configs.
|
|
for _, qConfig := range nConfig.QueueConfigs {
|
|
eventMatch := eventMatch(eventType, qConfig.Events)
|
|
ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
|
|
if eventMatch && ruleMatch {
|
|
targetLog := globalEventNotifier.GetExternalTarget(qConfig.QueueARN)
|
|
if targetLog != nil {
|
|
targetLog.WithFields(logrus.Fields{
|
|
"Key": path.Join(bucketName, objectName),
|
|
"EventType": eventType,
|
|
"Records": nEvent,
|
|
}).Info()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func eventNotifyForBucketListeners(eventType, objectName, bucketName string,
|
|
nEvent []NotificationEvent) {
|
|
lCfgs := globalEventNotifier.GetBucketListenerConfig(bucketName)
|
|
if lCfgs == nil {
|
|
return
|
|
}
|
|
// Validate if the event and object match listener configs
|
|
for _, lcfg := range lCfgs {
|
|
ruleMatch := filterRuleMatch(objectName, lcfg.TopicConfig.Filter.Key.FilterRules)
|
|
eventMatch := eventMatch(eventType, lcfg.TopicConfig.Events)
|
|
if eventMatch && ruleMatch {
|
|
targetLog := globalEventNotifier.GetInternalTarget(
|
|
lcfg.TopicConfig.TopicARN)
|
|
if targetLog != nil && targetLog.log != nil {
|
|
targetLog.log.WithFields(logrus.Fields{
|
|
"Key": path.Join(bucketName, objectName),
|
|
"EventType": eventType,
|
|
"Records": nEvent,
|
|
}).Info()
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// eventNotify notifies an event to relevant targets based on their
|
|
// bucket configuration (notifications and listeners).
|
|
func eventNotify(event eventData) {
|
|
if globalEventNotifier == nil {
|
|
return
|
|
}
|
|
// Notifies a new event.
|
|
// List of events reported through this function are
|
|
// - s3:ObjectCreated:Put
|
|
// - s3:ObjectCreated:Post
|
|
// - s3:ObjectCreated:Copy
|
|
// - s3:ObjectCreated:CompleteMultipartUpload
|
|
// - s3:ObjectRemoved:Delete
|
|
|
|
// Event type.
|
|
eventType := event.Type.String()
|
|
|
|
// Object name.
|
|
objectName := event.ObjInfo.Name
|
|
|
|
// Save the notification event to be sent.
|
|
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
|
|
|
|
// Notify external targets.
|
|
eventNotifyForBucketNotifications(eventType, objectName, event.Bucket, notificationEvent)
|
|
|
|
// Notify internal targets.
|
|
eventNotifyForBucketListeners(eventType, objectName, event.Bucket, notificationEvent)
|
|
}
|
|
|
|
// loads notification config if any for a given bucket, returns
|
|
// structured notification config.
|
|
func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) {
|
|
// Construct the notification config path.
|
|
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
|
|
|
// Acquire a write lock on notification config before modifying.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
|
|
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
|
|
return nil, err
|
|
}
|
|
defer objLock.RUnlock()
|
|
|
|
var buffer bytes.Buffer
|
|
err := objAPI.GetObject(minioMetaBucket, ncPath, 0, -1, &buffer) // Read everything.
|
|
if err != nil {
|
|
// 'notification.xml' not found return
|
|
// 'errNoSuchNotifications'. This is default when no
|
|
// bucket notifications are found on the bucket.
|
|
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
|
|
return nil, traceError(errNoSuchNotifications)
|
|
}
|
|
errorIf(err, "Unable to load bucket-notification for bucket %s", bucket)
|
|
// Returns error for other errors.
|
|
return nil, err
|
|
}
|
|
|
|
// if `notifications.xml` is empty we should return NoSuchNotifications.
|
|
if buffer.Len() == 0 {
|
|
return nil, traceError(errNoSuchNotifications)
|
|
}
|
|
|
|
// Unmarshal notification bytes.
|
|
notificationConfigBytes := buffer.Bytes()
|
|
notificationCfg := ¬ificationConfig{}
|
|
// Unmarshal notification bytes only if we read data.
|
|
if err = xml.Unmarshal(notificationConfigBytes, notificationCfg); err != nil {
|
|
return nil, traceError(err)
|
|
}
|
|
|
|
// Return success.
|
|
return notificationCfg, nil
|
|
}
|
|
|
|
// loads notification config if any for a given bucket, returns
|
|
// structured notification config.
|
|
func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, error) {
|
|
// in single node mode, there are no peers, so in this case
|
|
// there is no configuration to load, as any previously
|
|
// connected listen clients have been disconnected
|
|
if !globalIsDistXL {
|
|
return nil, nil
|
|
}
|
|
|
|
// Construct the notification config path.
|
|
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
|
|
|
// Acquire a write lock on notification config before modifying.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
|
|
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
|
|
return nil, err
|
|
}
|
|
defer objLock.RUnlock()
|
|
|
|
var buffer bytes.Buffer
|
|
err := objAPI.GetObject(minioMetaBucket, lcPath, 0, -1, &buffer)
|
|
if err != nil {
|
|
// 'listener.json' not found return
|
|
// 'errNoSuchNotifications'. This is default when no
|
|
// bucket listeners are found on the bucket
|
|
if isErrObjectNotFound(err) || isErrIncompleteBody(err) {
|
|
return nil, traceError(errNoSuchNotifications)
|
|
}
|
|
errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket)
|
|
// Returns error for other errors.
|
|
return nil, err
|
|
}
|
|
|
|
// if `listener.json` is empty we should return NoSuchNotifications.
|
|
if buffer.Len() == 0 {
|
|
return nil, traceError(errNoSuchNotifications)
|
|
}
|
|
|
|
var lCfg []listenerConfig
|
|
lConfigBytes := buffer.Bytes()
|
|
if err = json.Unmarshal(lConfigBytes, &lCfg); err != nil {
|
|
errorIf(err, "Unable to unmarshal listener config from JSON.")
|
|
return nil, traceError(err)
|
|
}
|
|
|
|
// Return success.
|
|
return lCfg, nil
|
|
}
|
|
|
|
func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj ObjectLayer) error {
|
|
// marshal to xml
|
|
buf, err := xml.Marshal(ncfg)
|
|
if err != nil {
|
|
errorIf(err, "Unable to marshal notification configuration into XML")
|
|
return err
|
|
}
|
|
|
|
// build path
|
|
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
|
// Acquire a write lock on notification config before modifying.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
|
|
if err = objLock.GetLock(globalOperationTimeout); err != nil {
|
|
return err
|
|
}
|
|
defer objLock.Unlock()
|
|
|
|
// write object to path
|
|
hashReader, err := hash.NewReader(bytes.NewReader(buf), int64(len(buf)), "", getSHA256Hash(buf))
|
|
if err != nil {
|
|
errorIf(err, "Unable to write bucket notification configuration.")
|
|
return err
|
|
}
|
|
_, err = obj.PutObject(minioMetaBucket, ncPath, hashReader, nil)
|
|
if err != nil {
|
|
errorIf(err, "Unable to write bucket notification configuration.")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Persists validated listener config to object layer.
|
|
func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer) error {
|
|
buf, err := json.Marshal(lcfg)
|
|
if err != nil {
|
|
errorIf(err, "Unable to marshal listener config to JSON.")
|
|
return err
|
|
}
|
|
|
|
// build path
|
|
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
|
// Acquire a write lock on notification config before modifying.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
|
|
if err = objLock.GetLock(globalOperationTimeout); err != nil {
|
|
return err
|
|
}
|
|
defer objLock.Unlock()
|
|
|
|
// write object to path
|
|
hashReader, err := hash.NewReader(bytes.NewReader(buf), int64(len(buf)), "", getSHA256Hash(buf))
|
|
if err != nil {
|
|
errorIf(err, "Unable to write bucket listener configuration to object layer.")
|
|
return err
|
|
}
|
|
|
|
// write object to path
|
|
_, err = obj.PutObject(minioMetaBucket, lcPath, hashReader, nil)
|
|
if err != nil {
|
|
errorIf(err, "Unable to write bucket listener configuration to object layer.")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Removes notification.xml for a given bucket, only used during DeleteBucket.
|
|
func removeNotificationConfig(bucket string, objAPI ObjectLayer) error {
|
|
// Verify bucket is valid.
|
|
if !IsValidBucketName(bucket) {
|
|
return BucketNameInvalid{Bucket: bucket}
|
|
}
|
|
|
|
ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
|
|
|
// Acquire a write lock on notification config before modifying.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath)
|
|
if err := objLock.GetLock(globalOperationTimeout); err != nil {
|
|
return err
|
|
}
|
|
defer objLock.Unlock()
|
|
return objAPI.DeleteObject(minioMetaBucket, ncPath)
|
|
}
|
|
|
|
// Remove listener configuration from storage layer. Used when a bucket is deleted.
|
|
func removeListenerConfig(bucket string, objAPI ObjectLayer) error {
|
|
// make the path
|
|
lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
|
|
|
// Acquire a write lock on notification config before modifying.
|
|
objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath)
|
|
if err := objLock.GetLock(globalOperationTimeout); err != nil {
|
|
return err
|
|
}
|
|
defer objLock.Unlock()
|
|
return objAPI.DeleteObject(minioMetaBucket, lcPath)
|
|
}
|
|
|
|
// Loads both notification and listener config.
|
|
func loadNotificationAndListenerConfig(bucketName string, objAPI ObjectLayer) (nCfg *notificationConfig, lCfg []listenerConfig, err error) {
|
|
// Loads notification config if any.
|
|
nCfg, err = loadNotificationConfig(bucketName, objAPI)
|
|
if err != nil && !isErrIgnored(err, errDiskNotFound, errNoSuchNotifications) {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Loads listener config if any.
|
|
lCfg, err = loadListenerConfig(bucketName, objAPI)
|
|
if err != nil && !isErrIgnored(err, errDiskNotFound, errNoSuchNotifications) {
|
|
return nil, nil, err
|
|
}
|
|
return nCfg, lCfg, nil
|
|
}
|
|
|
|
// loads all bucket notifications if present.
|
|
func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, map[string][]listenerConfig, error) {
|
|
// List buckets to proceed loading all notification configuration.
|
|
buckets, err := objAPI.ListBuckets()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
nConfigs := make(map[string]*notificationConfig)
|
|
lConfigs := make(map[string][]listenerConfig)
|
|
|
|
// Loads all bucket notifications.
|
|
for _, bucket := range buckets {
|
|
// Load persistent notification and listener configurations
|
|
// a given bucket name.
|
|
nConfigs[bucket.Name], lConfigs[bucket.Name], err = loadNotificationAndListenerConfig(bucket.Name, objAPI)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
// Success.
|
|
return nConfigs, lConfigs, nil
|
|
}
|
|
|
|
// addQueueTarget - calls newTargetFunc function and adds its returned value to queueTargets
|
|
func addQueueTarget(queueTargets map[string]*logrus.Logger,
|
|
accountID, queueType string,
|
|
newTargetFunc func(string) (*logrus.Logger, error)) (string, error) {
|
|
|
|
// Construct the queue ARN for AMQP.
|
|
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueType
|
|
|
|
// Queue target if already initialized we move to the next ARN.
|
|
if _, ok := queueTargets[queueARN]; ok {
|
|
return queueARN, nil
|
|
}
|
|
|
|
// Using accountID we can now initialize a new AMQP logrus instance.
|
|
logger, err := newTargetFunc(accountID)
|
|
if err == nil {
|
|
queueTargets[queueARN] = logger
|
|
}
|
|
|
|
return queueARN, err
|
|
}
|
|
|
|
// Loads all queue targets, initializes each queueARNs depending on their config.
|
|
// Each instance of queueARN registers its own logrus to communicate with the
|
|
// queue service. QueueARN once initialized is not initialized again for the
|
|
// same queueARN, instead previous connection is used.
|
|
func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
|
|
queueTargets := make(map[string]*logrus.Logger)
|
|
// Load all amqp targets, initialize their respective loggers.
|
|
for accountID, amqpN := range serverConfig.Notify.GetAMQP() {
|
|
if !amqpN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeAMQP, newAMQPNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load all mqtt targets, initialize their respective loggers.
|
|
for accountID, mqttN := range serverConfig.Notify.GetMQTT() {
|
|
if !mqttN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeMQTT, newMQTTNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load all nats targets, initialize their respective loggers.
|
|
for accountID, natsN := range serverConfig.Notify.GetNATS() {
|
|
if !natsN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeNATS, newNATSNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load redis targets, initialize their respective loggers.
|
|
for accountID, redisN := range serverConfig.Notify.GetRedis() {
|
|
if !redisN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeRedis, newRedisNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load Webhook targets, initialize their respective loggers.
|
|
for accountID, webhookN := range serverConfig.Notify.GetWebhook() {
|
|
if !webhookN.Enable {
|
|
continue
|
|
}
|
|
if _, err := addQueueTarget(queueTargets, accountID, queueTypeWebhook, newWebhookNotify); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load elastic targets, initialize their respective loggers.
|
|
for accountID, elasticN := range serverConfig.Notify.GetElasticSearch() {
|
|
if !elasticN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeElastic, newElasticNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load PostgreSQL targets, initialize their respective loggers.
|
|
for accountID, pgN := range serverConfig.Notify.GetPostgreSQL() {
|
|
if !pgN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypePostgreSQL, newPostgreSQLNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load MySQL targets, initialize their respective loggers.
|
|
for accountID, msqlN := range serverConfig.Notify.GetMySQL() {
|
|
if !msqlN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeMySQL, newMySQLNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Load Kafka targets, initialize their respective loggers.
|
|
for accountID, kafkaN := range serverConfig.Notify.GetKafka() {
|
|
if !kafkaN.Enable {
|
|
continue
|
|
}
|
|
|
|
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeKafka, newKafkaNotify); err != nil {
|
|
if _, ok := err.(net.Error); ok {
|
|
err = &net.OpError{
|
|
Op: "Connecting to " + queueARN,
|
|
Net: "tcp",
|
|
Err: err,
|
|
}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Successfully initialized queue targets.
|
|
return queueTargets, nil
|
|
}
|
|
|
|
// Global instance of event notification queue.
|
|
var globalEventNotifier *eventNotifier
|
|
|
|
// Initialize event notifier.
|
|
func initEventNotifier(objAPI ObjectLayer) error {
|
|
if objAPI == nil {
|
|
return errInvalidArgument
|
|
}
|
|
|
|
// Read all saved bucket notifications.
|
|
nConfigs, lConfigs, err := loadAllBucketNotifications(objAPI)
|
|
if err != nil {
|
|
errorIf(err, "Error loading bucket notifications - %v", err)
|
|
return err
|
|
}
|
|
|
|
// Initializes all queue targets.
|
|
queueTargets, err := loadAllQueueTargets()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Initialize internal listener targets
|
|
listenTargets := make(map[string]*listenerLogger)
|
|
for _, listeners := range lConfigs {
|
|
for _, listener := range listeners {
|
|
ln, err := newListenerLogger(
|
|
listener.TopicConfig.TopicARN,
|
|
listener.TargetServer,
|
|
)
|
|
if err != nil {
|
|
errorIf(err, "Unable to initialize listener target logger.")
|
|
//TODO: improve error
|
|
return fmt.Errorf("Error initializing listner target logger - %v", err)
|
|
}
|
|
listenTargets[listener.TopicConfig.TopicARN] = ln
|
|
}
|
|
}
|
|
|
|
// Initialize event notifier queue.
|
|
globalEventNotifier = &eventNotifier{
|
|
external: externalNotifier{
|
|
notificationConfigs: nConfigs,
|
|
targets: queueTargets,
|
|
rwMutex: &sync.RWMutex{},
|
|
},
|
|
internal: internalNotifier{
|
|
rwMutex: &sync.RWMutex{},
|
|
targets: listenTargets,
|
|
listenerConfigs: lConfigs,
|
|
connectedListeners: make(map[string]*listenChan),
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|