/* * Minio Cloud Storage, (C) 2016, 2017 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cmd import ( "bytes" "encoding/json" "encoding/xml" "fmt" "net" "net/url" "path" "sync" "github.com/Sirupsen/logrus" ) const ( minioEventSource = "minio:s3" ) type externalNotifier struct { // Per-bucket notification config. This is updated via // PutBucketNotification API. notificationConfigs map[string]*notificationConfig // An external target keeps a connection to an external // service to which events are to be sent. It is a mapping // from an ARN to a log object targets map[string]*logrus.Logger rwMutex *sync.RWMutex } type internalNotifier struct { // per-bucket listener configuration. This is updated // when listeners connect or disconnect. listenerConfigs map[string][]listenerConfig // An internal target is a peer Minio server, that is // connected to a listening client. Here, targets is a map of // listener ARN to log object. targets map[string]*listenerLogger // Connected listeners is a map of listener ARNs to channels // on which the ListenBucket API handler go routine is waiting // for events to send to a client. connectedListeners map[string]chan []NotificationEvent rwMutex *sync.RWMutex } // Global event notification configuration. This structure has state // about configured external notifications, and run-time configuration // for listener notifications. type eventNotifier struct { // `external` here refers to notification configuration to // send events to supported external systems external externalNotifier // `internal` refers to notification configuration for live // listening clients. Events for a client are send from all // servers, internally to a particular server that is // connected to the client. internal internalNotifier } // Represents data to be sent with notification event. type eventData struct { Type EventName Bucket string ObjInfo ObjectInfo ReqParams map[string]string Host string Port string UserAgent string } // New notification event constructs a new notification event message from // input request metadata which completed successfully. func newNotificationEvent(event eventData) NotificationEvent { // Fetch the region. region := serverConfig.GetRegion() // Fetch the credentials. creds := serverConfig.GetCredential() // Time when Minio finished processing the request. eventTime := UTCNow() // API endpoint is captured here to be returned back // to the client for it to differentiate from which // server the request came from. var apiEndpoint string if len(globalAPIEndpoints) >= 1 { apiEndpoint = globalAPIEndpoints[0] } // Fetch a hexadecimal representation of event time in nano seconds. uniqueID := mustGetRequestID(eventTime) /// Construct a new object created event. // Following blocks fills in all the necessary details of s3 // event message structure. // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html nEvent := NotificationEvent{ EventVersion: eventVersion, EventSource: minioEventSource, AwsRegion: region, EventTime: eventTime.Format(timeFormatAMZ), EventName: event.Type.String(), UserIdentity: identity{creds.AccessKey}, RequestParameters: event.ReqParams, ResponseElements: map[string]string{ responseRequestIDKey: uniqueID, // Following is a custom response element to indicate // event origin server endpoint. responseOriginEndpointKey: apiEndpoint, }, S3: eventMeta{ SchemaVersion: eventSchemaVersion, ConfigurationID: eventConfigID, Bucket: bucketMeta{ Name: event.Bucket, OwnerIdentity: identity{creds.AccessKey}, ARN: bucketARNPrefix + event.Bucket, }, }, Source: sourceInfo{ Host: event.Host, Port: event.Port, UserAgent: event.UserAgent, }, } // Escape the object name. For example "red flower.jpg" becomes "red+flower.jpg". escapedObj := url.QueryEscape(event.ObjInfo.Name) // For delete object event type, we do not need to set ETag and Size. if event.Type == ObjectRemovedDelete { nEvent.S3.Object = objectMeta{ Key: escapedObj, Sequencer: uniqueID, } return nEvent } // For all other events we should set ETag and Size. nEvent.S3.Object = objectMeta{ Key: escapedObj, ETag: event.ObjInfo.MD5Sum, Size: event.ObjInfo.Size, Sequencer: uniqueID, } // Success. return nEvent } // Fetch all external targets. This returns a copy of the current map of // external notification targets. func (en eventNotifier) GetAllExternalTargets() map[string]*logrus.Logger { en.external.rwMutex.RLock() defer en.external.rwMutex.RUnlock() targetsCopy := make(map[string]*logrus.Logger) for k, v := range en.external.targets { targetsCopy[k] = v } return targetsCopy } // Fetch the external target. func (en eventNotifier) GetExternalTarget(queueARN string) *logrus.Logger { en.external.rwMutex.RLock() defer en.external.rwMutex.RUnlock() return en.external.targets[queueARN] } func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger { en.internal.rwMutex.RLock() defer en.internal.rwMutex.RUnlock() return en.internal.targets[arn] } // Set a new sns target for an input sns ARN. func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh chan []NotificationEvent) error { if listenerCh == nil { return errInvalidArgument } en.internal.rwMutex.Lock() defer en.internal.rwMutex.Unlock() en.internal.connectedListeners[snsARN] = listenerCh return nil } // Remove sns target for an input sns ARN. func (en *eventNotifier) RemoveListenerChan(snsARN string) { en.internal.rwMutex.Lock() defer en.internal.rwMutex.Unlock() if en.internal.connectedListeners != nil { delete(en.internal.connectedListeners, snsARN) } } func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent) error { en.internal.rwMutex.Lock() defer en.internal.rwMutex.Unlock() ch, ok := en.internal.connectedListeners[arn] if ok { ch <- event } // If the channel is not present we ignore the event. return nil } // Fetch bucket notification config for an input bucket. func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig { en.external.rwMutex.RLock() defer en.external.rwMutex.RUnlock() return en.external.notificationConfigs[bucket] } func (en *eventNotifier) SetBucketNotificationConfig(bucket string, ncfg *notificationConfig) { en.external.rwMutex.Lock() if ncfg == nil { delete(en.external.notificationConfigs, bucket) } else { en.external.notificationConfigs[bucket] = ncfg } en.external.rwMutex.Unlock() } func (en *eventNotifier) GetBucketListenerConfig(bucket string) []listenerConfig { en.internal.rwMutex.RLock() defer en.internal.rwMutex.RUnlock() return en.internal.listenerConfigs[bucket] } func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerConfig) error { en.internal.rwMutex.Lock() defer en.internal.rwMutex.Unlock() if len(lcfg) == 0 { delete(en.internal.listenerConfigs, bucket) } else { en.internal.listenerConfigs[bucket] = lcfg } for _, elcArr := range en.internal.listenerConfigs { for _, elcElem := range elcArr { currArn := elcElem.TopicConfig.TopicARN logger, err := newListenerLogger(currArn, elcElem.TargetServer) if err != nil { return err } en.internal.targets[currArn] = logger } } return nil } func eventNotifyForBucketNotifications(eventType, objectName, bucketName string, nEvent []NotificationEvent) { nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) if nConfig == nil { return } // Validate if the event and object match the queue configs. for _, qConfig := range nConfig.QueueConfigs { eventMatch := eventMatch(eventType, qConfig.Events) ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules) if eventMatch && ruleMatch { targetLog := globalEventNotifier.GetExternalTarget(qConfig.QueueARN) if targetLog != nil { targetLog.WithFields(logrus.Fields{ "Key": path.Join(bucketName, objectName), "EventType": eventType, "Records": nEvent, }).Info() } } } } func eventNotifyForBucketListeners(eventType, objectName, bucketName string, nEvent []NotificationEvent) { lCfgs := globalEventNotifier.GetBucketListenerConfig(bucketName) if lCfgs == nil { return } // Validate if the event and object match listener configs for _, lcfg := range lCfgs { ruleMatch := filterRuleMatch(objectName, lcfg.TopicConfig.Filter.Key.FilterRules) eventMatch := eventMatch(eventType, lcfg.TopicConfig.Events) if eventMatch && ruleMatch { targetLog := globalEventNotifier.GetInternalTarget( lcfg.TopicConfig.TopicARN) if targetLog != nil && targetLog.log != nil { targetLog.log.WithFields(logrus.Fields{ "Key": path.Join(bucketName, objectName), "EventType": eventType, "Records": nEvent, }).Info() } } } } // eventNotify notifies an event to relevant targets based on their // bucket configuration (notifications and listeners). func eventNotify(event eventData) { if globalEventNotifier == nil { return } // Notifies a new event. // List of events reported through this function are // - s3:ObjectCreated:Put // - s3:ObjectCreated:Post // - s3:ObjectCreated:Copy // - s3:ObjectCreated:CompleteMultipartUpload // - s3:ObjectRemoved:Delete // Event type. eventType := event.Type.String() // Object name. objectName := event.ObjInfo.Name // Save the notification event to be sent. notificationEvent := []NotificationEvent{newNotificationEvent(event)} // Notify external targets. eventNotifyForBucketNotifications(eventType, objectName, event.Bucket, notificationEvent) // Notify internal targets. eventNotifyForBucketListeners(eventType, objectName, event.Bucket, notificationEvent) } // loads notification config if any for a given bucket, returns // structured notification config. func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) { // Construct the notification config path. ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) // Acquire a write lock on notification config before modifying. objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) objLock.RLock() defer objLock.RUnlock() var buffer bytes.Buffer err := objAPI.GetObject(minioMetaBucket, ncPath, 0, -1, &buffer) // Read everything. if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no // bucket notifications are found on the bucket. if isErrObjectNotFound(err) || isErrIncompleteBody(err) { return nil, errNoSuchNotifications } errorIf(err, "Unable to load bucket-notification for bucket %s", bucket) // Returns error for other errors. return nil, err } // Unmarshal notification bytes. notificationConfigBytes := buffer.Bytes() notificationCfg := ¬ificationConfig{} if err = xml.Unmarshal(notificationConfigBytes, ¬ificationCfg); err != nil { return nil, err } // Return success. return notificationCfg, nil } // loads notification config if any for a given bucket, returns // structured notification config. func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, error) { // in single node mode, there are no peers, so in this case // there is no configuration to load, as any previously // connected listen clients have been disconnected if !globalIsDistXL { return nil, nil } // Construct the notification config path. lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) // Acquire a write lock on notification config before modifying. objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) objLock.RLock() defer objLock.RUnlock() var buffer bytes.Buffer err := objAPI.GetObject(minioMetaBucket, lcPath, 0, -1, &buffer) if err != nil { // 'notification.xml' not found return // 'errNoSuchNotifications'. This is default when no // bucket listeners are found on the bucket. if isErrObjectNotFound(err) { return nil, errNoSuchNotifications } errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket) // Returns error for other errors. return nil, err } // Unmarshal notification bytes. var lCfg []listenerConfig lConfigBytes := buffer.Bytes() if err = json.Unmarshal(lConfigBytes, &lCfg); err != nil { errorIf(err, "Unable to unmarshal listener config from JSON.") return nil, err } // Return success. return lCfg, nil } func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj ObjectLayer) error { // marshal to xml buf, err := xml.Marshal(ncfg) if err != nil { errorIf(err, "Unable to marshal notification configuration into XML") return err } // build path ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) // Acquire a write lock on notification config before modifying. objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) objLock.Lock() defer objLock.Unlock() // write object to path sha256Sum := getSHA256Hash(buf) _, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum) if err != nil { errorIf(err, "Unable to write bucket notification configuration.") return err } return nil } // Persists validated listener config to object layer. func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer) error { buf, err := json.Marshal(lcfg) if err != nil { errorIf(err, "Unable to marshal listener config to JSON.") return err } // build path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) // Acquire a write lock on notification config before modifying. objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) objLock.Lock() defer objLock.Unlock() // write object to path sha256Sum := getSHA256Hash(buf) _, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)), bytes.NewReader(buf), nil, sha256Sum) if err != nil { errorIf(err, "Unable to write bucket listener configuration to object layer.") } return err } // Removes notification.xml for a given bucket, only used during DeleteBucket. func removeNotificationConfig(bucket string, objAPI ObjectLayer) error { // Verify bucket is valid. if !IsValidBucketName(bucket) { return BucketNameInvalid{Bucket: bucket} } ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) // Acquire a write lock on notification config before modifying. objLock := globalNSMutex.NewNSLock(minioMetaBucket, ncPath) objLock.Lock() err := objAPI.DeleteObject(minioMetaBucket, ncPath) objLock.Unlock() return err } // Remove listener configuration from storage layer. Used when a bucket is deleted. func removeListenerConfig(bucket string, objAPI ObjectLayer) error { // make the path lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig) // Acquire a write lock on notification config before modifying. objLock := globalNSMutex.NewNSLock(minioMetaBucket, lcPath) objLock.Lock() err := objAPI.DeleteObject(minioMetaBucket, lcPath) objLock.Unlock() return err } // Loads both notification and listener config. func loadNotificationAndListenerConfig(bucketName string, objAPI ObjectLayer) (nCfg *notificationConfig, lCfg []listenerConfig, err error) { // Loads notification config if any. nCfg, err = loadNotificationConfig(bucketName, objAPI) if err != nil && !isErrIgnored(err, errDiskNotFound, errNoSuchNotifications) { return nil, nil, err } // Loads listener config if any. lCfg, err = loadListenerConfig(bucketName, objAPI) if err != nil && !isErrIgnored(err, errDiskNotFound, errNoSuchNotifications) { return nil, nil, err } return nCfg, lCfg, nil } // loads all bucket notifications if present. func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, map[string][]listenerConfig, error) { // List buckets to proceed loading all notification configuration. buckets, err := objAPI.ListBuckets() if err != nil { return nil, nil, err } nConfigs := make(map[string]*notificationConfig) lConfigs := make(map[string][]listenerConfig) // Loads all bucket notifications. for _, bucket := range buckets { // Load persistent notification and listener configurations // a given bucket name. nConfigs[bucket.Name], lConfigs[bucket.Name], err = loadNotificationAndListenerConfig(bucket.Name, objAPI) if err != nil { return nil, nil, err } } // Success. return nConfigs, lConfigs, nil } // addQueueTarget - calls newTargetFunc function and adds its returned value to queueTargets func addQueueTarget(queueTargets map[string]*logrus.Logger, accountID, queueType string, newTargetFunc func(string) (*logrus.Logger, error)) (string, error) { // Construct the queue ARN for AMQP. queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueType // Queue target if already initialized we move to the next ARN. if _, ok := queueTargets[queueARN]; ok { return queueARN, nil } // Using accountID we can now initialize a new AMQP logrus instance. logger, err := newTargetFunc(accountID) if err == nil { queueTargets[queueARN] = logger } return queueARN, err } // Loads all queue targets, initializes each queueARNs depending on their config. // Each instance of queueARN registers its own logrus to communicate with the // queue service. QueueARN once initialized is not initialized again for the // same queueARN, instead previous connection is used. func loadAllQueueTargets() (map[string]*logrus.Logger, error) { queueTargets := make(map[string]*logrus.Logger) // Load all amqp targets, initialize their respective loggers. for accountID, amqpN := range serverConfig.Notify.GetAMQP() { if !amqpN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeAMQP, newAMQPNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Load all nats targets, initialize their respective loggers. for accountID, natsN := range serverConfig.Notify.GetNATS() { if !natsN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeNATS, newNATSNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Load redis targets, initialize their respective loggers. for accountID, redisN := range serverConfig.Notify.GetRedis() { if !redisN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeRedis, newRedisNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Load Webhook targets, initialize their respective loggers. for accountID, webhookN := range serverConfig.Notify.GetWebhook() { if !webhookN.Enable { continue } if _, err := addQueueTarget(queueTargets, accountID, queueTypeWebhook, newWebhookNotify); err != nil { return nil, err } } // Load elastic targets, initialize their respective loggers. for accountID, elasticN := range serverConfig.Notify.GetElasticSearch() { if !elasticN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeElastic, newElasticNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Load PostgreSQL targets, initialize their respective loggers. for accountID, pgN := range serverConfig.Notify.GetPostgreSQL() { if !pgN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypePostgreSQL, newPostgreSQLNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Load MySQL targets, initialize their respective loggers. for accountID, msqlN := range serverConfig.Notify.GetMySQL() { if !msqlN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeMySQL, newMySQLNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Load Kafka targets, initialize their respective loggers. for accountID, kafkaN := range serverConfig.Notify.GetKafka() { if !kafkaN.Enable { continue } if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeKafka, newKafkaNotify); err != nil { if _, ok := err.(net.Error); ok { err = &net.OpError{ Op: "Connecting to " + queueARN, Net: "tcp", Err: err, } } return nil, err } } // Successfully initialized queue targets. return queueTargets, nil } // Global instance of event notification queue. var globalEventNotifier *eventNotifier // Initialize event notifier. func initEventNotifier(objAPI ObjectLayer) error { if objAPI == nil { return errInvalidArgument } // Read all saved bucket notifications. nConfigs, lConfigs, err := loadAllBucketNotifications(objAPI) if err != nil { errorIf(err, "Error loading bucket notifications - %v", err) return err } // Initializes all queue targets. queueTargets, err := loadAllQueueTargets() if err != nil { return err } // Initialize internal listener targets listenTargets := make(map[string]*listenerLogger) for _, listeners := range lConfigs { for _, listener := range listeners { ln, err := newListenerLogger( listener.TopicConfig.TopicARN, listener.TargetServer, ) if err != nil { errorIf(err, "Unable to initialize listener target logger.") //TODO: improve error return fmt.Errorf("Error initializing listner target logger - %v", err) } listenTargets[listener.TopicConfig.TopicARN] = ln } } // Initialize event notifier queue. globalEventNotifier = &eventNotifier{ external: externalNotifier{ notificationConfigs: nConfigs, targets: queueTargets, rwMutex: &sync.RWMutex{}, }, internal: internalNotifier{ rwMutex: &sync.RWMutex{}, targets: listenTargets, listenerConfigs: lConfigs, connectedListeners: make(map[string]chan []NotificationEvent), }, } return nil }