/* * Minio Cloud Storage, (C) 2016 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main import ( "bytes" "encoding/xml" "errors" "fmt" "net/url" "path" "sync" "time" "github.com/Sirupsen/logrus" ) // Global event notification queue. This is the queue that would be used to send all notifications. type eventNotifier struct { rwMutex *sync.RWMutex // Collection of 'bucket' and notification config. notificationConfigs map[string]*notificationConfig lambdaTargets map[string][]chan []NotificationEvent queueTargets map[string]*logrus.Logger } // Represents data to be sent with notification event. type eventData struct { Type EventName Bucket string ObjInfo ObjectInfo ReqParams map[string]string } // New notification event constructs a new notification event message from // input request metadata which completed successfully. func newNotificationEvent(event eventData) NotificationEvent { /// Construct a new object created event. region := serverConfig.GetRegion() tnow := time.Now().UTC() sequencer := fmt.Sprintf("%X", tnow.UnixNano()) // Following blocks fills in all the necessary details of s3 event message structure. // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html nEvent := NotificationEvent{ EventVersion: "2.0", EventSource: "aws:s3", AwsRegion: region, EventTime: tnow.Format(timeFormatAMZ), EventName: event.Type.String(), UserIdentity: defaultIdentity(), RequestParameters: event.ReqParams, ResponseElements: map[string]string{}, S3: eventMeta{ SchemaVersion: "1.0", ConfigurationID: "Config", Bucket: bucketMeta{ Name: event.Bucket, OwnerIdentity: defaultIdentity(), ARN: "arn:aws:s3:::" + event.Bucket, }, }, } escapedObj := url.QueryEscape(event.ObjInfo.Name) // For delete object event type, we do not need to set ETag and Size. if event.Type == ObjectRemovedDelete { nEvent.S3.Object = objectMeta{ Key: escapedObj, Sequencer: sequencer, } return nEvent } // For all other events we should set ETag and Size. nEvent.S3.Object = objectMeta{ Key: escapedObj, ETag: event.ObjInfo.MD5Sum, Size: event.ObjInfo.Size, Sequencer: sequencer, } // Success. return nEvent } // Fetch the saved queue target. func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger { return en.queueTargets[queueARN] } func (en eventNotifier) GetLambdaTarget(lambdaARN string) []chan []NotificationEvent { en.rwMutex.RLock() defer en.rwMutex.RUnlock() return en.lambdaTargets[lambdaARN] } // Set a new lambda target for an input lambda ARN. func (en *eventNotifier) SetLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) error { en.rwMutex.Lock() defer en.rwMutex.Unlock() if listenerCh == nil { return errors.New("invalid argument") } en.lambdaTargets[lambdaARN] = append(en.lambdaTargets[lambdaARN], listenerCh) return nil } // Remove lambda target for an input lambda ARN. func (en *eventNotifier) RemoveLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) { en.rwMutex.Lock() defer en.rwMutex.Unlock() lambdaTarget, ok := en.lambdaTargets[lambdaARN] if ok { for i, savedListenerCh := range lambdaTarget { if listenerCh == savedListenerCh { lambdaTarget = append(lambdaTarget[:i], lambdaTarget[i+1:]...) if len(lambdaTarget) == 0 { delete(en.lambdaTargets, lambdaARN) break } en.lambdaTargets[lambdaARN] = lambdaTarget } } } } // Returns true if bucket notification is set for the bucket, false otherwise. func (en eventNotifier) IsBucketNotificationSet(bucket string) bool { en.rwMutex.RLock() defer en.rwMutex.RUnlock() _, ok := en.notificationConfigs[bucket] return ok } // Fetch bucket notification config for an input bucket. func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig { en.rwMutex.RLock() defer en.rwMutex.RUnlock() return en.notificationConfigs[bucket] } // Set a new notification config for a bucket, this operation will overwrite any previous // notification configs for the bucket. func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error { en.rwMutex.Lock() defer en.rwMutex.Unlock() if notificationCfg == nil { return errors.New("invalid argument") } en.notificationConfigs[bucket] = notificationCfg return nil } // eventNotify notifies an event to relevant targets based on their // bucket notification configs. func eventNotify(event eventData) { // Notifies a new event. // List of events reported through this function are // - s3:ObjectCreated:Put // - s3:ObjectCreated:Post // - s3:ObjectCreated:Copy // - s3:ObjectCreated:CompleteMultipartUpload // - s3:ObjectRemoved:Delete nConfig := eventN.GetBucketNotificationConfig(event.Bucket) // No bucket notifications enabled, drop the event notification. if len(nConfig.QueueConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { return } // Event type. eventType := event.Type.String() // Object name. objectName := event.ObjInfo.Name // Save the notification event to be sent. notificationEvent := []NotificationEvent{newNotificationEvent(event)} // Validate if the event and object match the queue configs. for _, qConfig := range nConfig.QueueConfigs { eventMatch := eventMatch(eventType, qConfig.Events) ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules) if eventMatch && ruleMatch { targetLog := eventN.GetQueueTarget(qConfig.QueueARN) if targetLog != nil { targetLog.WithFields(logrus.Fields{ "Records": notificationEvent, }).Info() } } } // Validate if the event and object match the lambda configs. for _, lambdaConfig := range nConfig.LambdaConfigs { ruleMatch := filterRuleMatch(objectName, lambdaConfig.Filter.Key.FilterRules) eventMatch := eventMatch(eventType, lambdaConfig.Events) if eventMatch && ruleMatch { targetListeners := eventN.GetLambdaTarget(lambdaConfig.LambdaARN) for _, listener := range targetListeners { listener <- notificationEvent } } } } // loads notifcation config if any for a given bucket, returns back structured notification config. func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) { // Construct the notification config path. notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) if err != nil { // 'notification.xml' not found return 'errNoSuchNotifications'. // This is default when no bucket notifications are found on the bucket. switch err.(type) { case ObjectNotFound: return nil, errNoSuchNotifications } // Returns error for other errors. return nil, err } var buffer bytes.Buffer err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer) if err != nil { // 'notification.xml' not found return 'errNoSuchNotifications'. // This is default when no bucket notifications are found on the bucket. switch err.(type) { case ObjectNotFound: return nil, errNoSuchNotifications } // Returns error for other errors. return nil, err } // Unmarshal notification bytes. notificationConfigBytes := buffer.Bytes() notificationCfg := ¬ificationConfig{} if err = xml.Unmarshal(notificationConfigBytes, ¬ificationCfg); err != nil { return nil, err } // Successfully marshalled notification configuration. // Return success. return notificationCfg, nil } // loads all bucket notifications if present. func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) { // List buckets to proceed loading all notification configuration. buckets, err := objAPI.ListBuckets() if err != nil { return nil, err } configs := make(map[string]*notificationConfig) // Loads all bucket notifications. for _, bucket := range buckets { var nCfg *notificationConfig nCfg, err = loadNotificationConfig(bucket.Name, objAPI) if err != nil { if err == errNoSuchNotifications { continue } return nil, err } configs[bucket.Name] = nCfg } // Success. return configs, nil } // Loads all queue targets, initializes each queueARNs depending on their config. // Each instance of queueARN registers its own logrus to communicate with the // queue service. QueueARN once initialized is not initialized again for the // same queueARN, instead previous connection is used. func loadAllQueueTargets() (map[string]*logrus.Logger, error) { queueTargets := make(map[string]*logrus.Logger) // Load all amqp targets, initialize their respective loggers. for accountID, amqpN := range serverConfig.GetAMQP() { if !amqpN.Enable { continue } // Construct the queue ARN for AMQP. queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeAMQP // Queue target if already initialized we move to the next ARN. _, ok := queueTargets[queueARN] if ok { continue } // Using accountID we can now initialize a new AMQP logrus instance. amqpLog, err := newAMQPNotify(accountID) if err != nil { return nil, err } queueTargets[queueARN] = amqpLog } // Load redis targets, initialize their respective loggers. for accountID, redisN := range serverConfig.GetRedis() { if !redisN.Enable { continue } // Construct the queue ARN for Redis. queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeRedis // Queue target if already initialized we move to the next ARN. _, ok := queueTargets[queueARN] if ok { continue } // Using accountID we can now initialize a new Redis logrus instance. redisLog, err := newRedisNotify(accountID) if err != nil { return nil, err } queueTargets[queueARN] = redisLog } // Load elastic targets, initialize their respective loggers. for accountID, elasticN := range serverConfig.GetElasticSearch() { if !elasticN.Enable { continue } // Construct the queue ARN for Elastic. queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeElastic _, ok := queueTargets[queueARN] if ok { continue } // Using accountID we can now initialize a new ElasticSearch logrus instance. elasticLog, err := newElasticNotify(accountID) if err != nil { return nil, err } queueTargets[queueARN] = elasticLog } // Successfully initialized queue targets. return queueTargets, nil } // Global instance of event notification queue. var eventN *eventNotifier // Initialize event notifier. func initEventNotifier(objAPI ObjectLayer) error { if objAPI == nil { return errInvalidArgument } // Read all saved bucket notifications. configs, err := loadAllBucketNotifications(objAPI) if err != nil { return err } // Initializes all queue targets. queueTargets, err := loadAllQueueTargets() if err != nil { return err } // Inititalize event notifier queue. eventN = &eventNotifier{ rwMutex: &sync.RWMutex{}, notificationConfigs: configs, queueTargets: queueTargets, lambdaTargets: make(map[string][]chan []NotificationEvent), } return nil }