diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index ac967db15..e39984242 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -290,7 +290,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, // Write success response. writeSuccessResponse(w, encodedSuccessResponse) - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify deleted event for objects. for _, dobj := range deletedObjects { eventNotify(eventData{ @@ -403,7 +403,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h // Write successful response. writeSuccessNoContent(w) - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications. objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index f1cbaf989..73b276491 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -141,7 +141,7 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, } // Set bucket notification config. - eventN.SetBucketNotificationConfig(bucket, ¬ificationCfg) + globalEventNotifier.SetBucketNotificationConfig(bucket, ¬ificationCfg) // Success. writeSuccessResponse(w, nil) @@ -227,7 +227,7 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit return } - notificationCfg := eventN.GetBucketNotificationConfig(bucket) + notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucket) if notificationCfg == nil { writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) return @@ -249,9 +249,9 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit defer close(nEventCh) // Set sns target. - eventN.SetSNSTarget(topicARN, nEventCh) + globalEventNotifier.SetSNSTarget(topicARN, nEventCh) // Remove sns listener after the writer has closed or the client disconnected. - defer eventN.RemoveSNSTarget(topicARN, nEventCh) + defer globalEventNotifier.RemoveSNSTarget(topicARN, nEventCh) // Start sending bucket notifications. sendBucketNotification(w, nEventCh) diff --git a/cmd/config-v7_test.go b/cmd/config-v7_test.go index 46ce85823..4eecfde05 100644 --- a/cmd/config-v7_test.go +++ b/cmd/config-v7_test.go @@ -1,3 +1,19 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cmd import ( diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 3794cdb69..cef9e4438 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "encoding/xml" - "errors" "fmt" "net/url" "path" @@ -112,7 +111,7 @@ func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []Notificat en.rwMutex.Lock() defer en.rwMutex.Unlock() if listenerCh == nil { - return errors.New("invalid argument") + return errInvalidArgument } en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh) return nil @@ -161,7 +160,7 @@ func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notification en.rwMutex.Lock() defer en.rwMutex.Unlock() if notificationCfg == nil { - return errors.New("invalid argument") + return errInvalidArgument } en.notificationConfigs[bucket] = notificationCfg return nil @@ -178,8 +177,11 @@ func eventNotify(event eventData) { // - s3:ObjectCreated:CompleteMultipartUpload // - s3:ObjectRemoved:Delete - nConfig := eventN.GetBucketNotificationConfig(event.Bucket) + nConfig := globalEventNotifier.GetBucketNotificationConfig(event.Bucket) // No bucket notifications enabled, drop the event notification. + if nConfig == nil { + return + } if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { return } @@ -198,7 +200,7 @@ func eventNotify(event eventData) { eventMatch := eventMatch(eventType, qConfig.Events) ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules) if eventMatch && ruleMatch { - targetLog := eventN.GetQueueTarget(qConfig.QueueARN) + targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN) if targetLog != nil { targetLog.WithFields(logrus.Fields{ "Records": notificationEvent, @@ -211,7 +213,7 @@ func eventNotify(event eventData) { ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules) eventMatch := eventMatch(eventType, topicConfig.Events) if eventMatch && ruleMatch { - targetListeners := eventN.GetSNSTarget(topicConfig.TopicARN) + targetListeners := globalEventNotifier.GetSNSTarget(topicConfig.TopicARN) for _, listener := range targetListeners { listener <- notificationEvent } @@ -352,7 +354,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } // Global instance of event notification queue. -var eventN *eventNotifier +var globalEventNotifier *eventNotifier // Initialize event notifier. func initEventNotifier(objAPI ObjectLayer) error { @@ -373,7 +375,7 @@ func initEventNotifier(objAPI ObjectLayer) error { } // Inititalize event notifier queue. - eventN = &eventNotifier{ + globalEventNotifier = &eventNotifier{ rwMutex: &sync.RWMutex{}, notificationConfigs: configs, queueTargets: queueTargets, diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index d7b4b40b9..c43f8569a 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -16,7 +16,73 @@ package cmd -import "testing" +import ( + "reflect" + "testing" +) + +// Tests event notify. +func TestEventNotify(t *testing.T) { + ExecObjectLayerTest(t, testEventNotify) +} + +func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { + bucketName := getRandomBucketName() + + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root folder after the test ends. + defer removeAll(rootPath) + + initEventNotifier(obj) + + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPost, + Bucket: bucketName, + ObjInfo: ObjectInfo{ + Bucket: bucketName, + Name: "object1", + }, + ReqParams: map[string]string{ + "sourceIPAddress": "localhost:1337", + }, + }) + + if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, nil); err != errInvalidArgument { + t.Errorf("Expected error %s, got %s", errInvalidArgument, err) + } + + if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{}); err != nil { + t.Errorf("Expected error to be nil, got %s", err) + } + + if !globalEventNotifier.IsBucketNotificationSet(bucketName) { + t.Errorf("Notification expected to be set, but notification not set.") + } + + nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) + if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) { + t.Errorf("Mismatching notification configs.") + } + + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPost, + Bucket: bucketName, + ObjInfo: ObjectInfo{ + Bucket: bucketName, + Name: "object1", + }, + ReqParams: map[string]string{ + "sourceIPAddress": "localhost:1337", + }, + }) +} // Tests various forms of inititalization of event notifier. func TestInitEventNotifier(t *testing.T) { diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index af08ce70c..90d0702f4 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -357,7 +357,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // write success response. writeSuccessResponse(w, encodedSuccessResponse) - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object created event. eventNotify(eventData{ Type: ObjectCreatedCopy, @@ -456,7 +456,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } writeSuccessResponse(w, nil) - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications. objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { @@ -797,7 +797,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.Write(encodedSuccessResponse) w.(http.Flusher).Flush() - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications. objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { @@ -851,7 +851,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } writeSuccessNoContent(w) - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object deleted event. eventNotify(eventData{ Type: ObjectRemovedDelete, diff --git a/cmd/server-main.go b/cmd/server-main.go index 608043bec..a0ed0307a 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -256,15 +256,15 @@ func serverMain(c *cli.Context) { // Fetch endpoints which we are going to serve from. endPoints := finalizeEndpoints(tls, &apiServer.Server) - // Prints the formatted startup message. - printStartupMessage(endPoints) - // Register generic callbacks. globalShutdownCBs.AddGenericCB(func() errCode { // apiServer.Stop() return exitSuccess }) + // Prints the formatted startup message. + printStartupMessage(endPoints) + // Start server. // Configure TLS if certs are available. if tls { diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index 23c7d253c..d71158bef 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -60,6 +60,11 @@ func printServerCommonMsg(endPoints []string) { console.Println(colorBlue("AccessKey: ") + colorBold(fmt.Sprintf("%s ", cred.AccessKeyID))) console.Println(colorBlue("SecretKey: ") + colorBold(fmt.Sprintf("%s ", cred.SecretAccessKey))) console.Println(colorBlue("Region: ") + colorBold(fmt.Sprintf(getFormatStr(len(region), 3), region))) + arnMsg := colorBlue("SqsARNs: ") + for queueArn := range globalEventNotifier.queueTargets { + arnMsg += colorBold(fmt.Sprintf(getFormatStr(len(queueArn), 2), queueArn)) + } + console.Println(arnMsg) console.Println(colorBlue("\nBrowser Access:")) console.Println(fmt.Sprintf(getFormatStr(len(endPointStr), 3), endPointStr)) diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 024d2ce0a..add91dd9a 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -396,7 +396,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - if eventN.IsBucketNotificationSet(bucket) { + if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object created event. eventNotify(eventData{ Type: ObjectCreatedPut,