From 5fdd7689030cf083222e42fab0f6843c5dc3ea3a Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Thu, 29 Sep 2016 11:16:19 +0530 Subject: [PATCH] Make addition of TopicConfig to globalEventNotifier go-routine safe (#2806) --- cmd/bucket-handlers.go | 46 +++++++------- cmd/bucket-notification-handlers.go | 51 +++++---------- cmd/event-notifier.go | 28 +++++---- cmd/event-notifier_test.go | 71 ++++++++++++++++++++- cmd/object-handlers.go | 96 +++++++++++++---------------- cmd/web-handlers.go | 20 +++--- 6 files changed, 174 insertions(+), 138 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index d63975e91..a8ad3e016 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -331,20 +331,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, // Write success response. writeSuccessResponse(w, encodedSuccessResponse) - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Notify deleted event for objects. - for _, dobj := range deletedObjects { - eventNotify(eventData{ - Type: ObjectRemovedDelete, - Bucket: bucket, - ObjInfo: ObjectInfo{ - Name: dobj.ObjectName, - }, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) - } + // Notify deleted event for objects. + for _, dobj := range deletedObjects { + eventNotify(eventData{ + Type: ObjectRemovedDelete, + Bucket: bucket, + ObjInfo: ObjectInfo{ + Name: dobj.ObjectName, + }, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } } @@ -453,17 +451,15 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h // Write successful response. writeSuccessNoContent(w) - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Notify object created event. - eventNotify(eventData{ - Type: ObjectCreatedPost, - Bucket: bucket, - ObjInfo: objInfo, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) - } + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPost, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } // HeadBucketHandler - HEAD Bucket diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go index 848e4e980..7b09bd0a3 100644 --- a/cmd/bucket-notification-handlers.go +++ b/cmd/bucket-notification-handlers.go @@ -270,47 +270,24 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit }) } - // Fetch for existing notification configs and update topic configs. - nConfig := globalEventNotifier.GetBucketNotificationConfig(bucket) - if nConfig == nil { - // No notification configs found, initialize. - nConfig = ¬ificationConfig{ - TopicConfigs: []topicConfig{{ - TopicARN: accountARN, - serviceConfig: serviceConfig{ - Events: events, - Filter: struct { - Key keyFilter `xml:"S3Key,omitempty"` - }{ - Key: keyFilter{ - FilterRules: filterRules, - }, - }, - ID: "sns-" + accountID, + // Make topic configuration corresponding to this ListenBucketNotification request. + topicCfg := &topicConfig{ + TopicARN: accountARN, + serviceConfig: serviceConfig{ + Events: events, + Filter: struct { + Key keyFilter `xml:"S3Key,omitempty"` + }{ + Key: keyFilter{ + FilterRules: filterRules, }, - }}, - } - } else { - // Previously set notification configs found append to - // existing topic configs. - nConfig.TopicConfigs = append(nConfig.TopicConfigs, topicConfig{ - TopicARN: accountARN, - serviceConfig: serviceConfig{ - Events: events, - Filter: struct { - Key keyFilter `xml:"S3Key,omitempty"` - }{ - Key: keyFilter{ - FilterRules: filterRules, - }, - }, - ID: "sns-" + accountID, }, - }) + ID: "sns-" + accountID, + }, } - // Save bucket notification config. - if err = globalEventNotifier.SetBucketNotificationConfig(bucket, nConfig); err != nil { + // Add topic config to bucket notification config. + if err = globalEventNotifier.AddTopicConfig(bucket, topicCfg); err != nil { writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 08617b2b1..4ed1e5cc0 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -137,17 +137,6 @@ func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []Notifi } } -// Returns true if bucket notification is set for the bucket, false otherwise. -func (en *eventNotifier) IsBucketNotificationSet(bucket string) bool { - if en == nil { - return false - } - en.rwMutex.RLock() - defer en.rwMutex.RUnlock() - _, ok := en.notificationConfigs[bucket] - return ok -} - // Fetch bucket notification config for an input bucket. func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig { en.rwMutex.RLock() @@ -167,6 +156,23 @@ func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notification return nil } +func (en *eventNotifier) AddTopicConfig(bucket string, topicCfg *topicConfig) error { + en.rwMutex.Lock() + defer en.rwMutex.Unlock() + if topicCfg == nil { + return errInvalidArgument + } + notificationCfg := en.notificationConfigs[bucket] + if notificationCfg == nil { + en.notificationConfigs[bucket] = ¬ificationConfig{ + TopicConfigs: []topicConfig{*topicCfg}, + } + return nil + } + notificationCfg.TopicConfigs = append(notificationCfg.TopicConfigs, *topicCfg) + return nil +} + // eventNotify notifies an event to relevant targets based on their // bucket notification configs. func eventNotify(event eventData) { diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index e2f571e70..e9918e4bf 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -17,6 +17,7 @@ package cmd import ( + "fmt" "reflect" "testing" "time" @@ -64,11 +65,11 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { t.Errorf("Expected error to be nil, got %s", err) } - if !globalEventNotifier.IsBucketNotificationSet(bucketName) { + nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) + if nConfig == nil { t.Errorf("Notification expected to be set, but notification not set.") } - nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName) if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) { t.Errorf("Mismatching notification configs.") } @@ -381,3 +382,69 @@ func TestListenBucketNotification(t *testing.T) { break } } + +func testAddTopicConfig(obj ObjectLayer, instanceType string, t TestErrHandler) { + root, cErr := newTestConfig("us-east-1") + if cErr != nil { + t.Fatalf("[%s] Failed to initialize test config: %v", instanceType, cErr) + } + defer removeAll(root) + + if err := initEventNotifier(obj); err != nil { + t.Fatalf("[%s] : Failed to initialize event notifier: %v", instanceType, err) + } + + // Make a bucket to store topicConfigs. + randBucket := getRandomBucketName() + if err := obj.MakeBucket(randBucket); err != nil { + t.Fatalf("[%s] : Failed to make bucket %s", instanceType, randBucket) + } + + // Add a topicConfig to an empty notificationConfig. + accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano()) + accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen" + var filterRules []filterRule + filterRules = append(filterRules, filterRule{ + Name: "prefix", + Value: "minio", + }) + filterRules = append(filterRules, filterRule{ + Name: "suffix", + Value: "*.jpg", + }) + + // Make topic configuration corresponding to this ListenBucketNotification request. + sampleTopicCfg := &topicConfig{ + TopicARN: accountARN, + serviceConfig: serviceConfig{ + Filter: struct { + Key keyFilter `xml:"S3Key,omitempty"` + }{ + Key: keyFilter{ + FilterRules: filterRules, + }, + }, + ID: "sns-" + accountID, + }, + } + testCases := []struct { + topicCfg *topicConfig + expectedErr error + }{ + {sampleTopicCfg, nil}, + {nil, errInvalidArgument}, + {sampleTopicCfg, nil}, + } + + for i, test := range testCases { + err := globalEventNotifier.AddTopicConfig(randBucket, test.topicCfg) + if err != test.expectedErr { + t.Errorf("Test %d: %s failed with error %v, expected to fail with %v", + i+1, instanceType, err, test.expectedErr) + } + } +} + +func TestAddTopicConfig(t *testing.T) { + ExecObjectLayerTest(t, testAddTopicConfig) +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 4ecbed311..4f2a7f79c 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -378,17 +378,15 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // write success response. writeSuccessResponse(w, encodedSuccessResponse) - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Notify object created event. - eventNotify(eventData{ - Type: ObjectCreatedCopy, - Bucket: bucket, - ObjInfo: objInfo, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) - } + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedCopy, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } // PutObjectHandler - PUT Object @@ -482,17 +480,15 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"") writeSuccessResponse(w, nil) - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Notify object created event. - eventNotify(eventData{ - Type: ObjectCreatedPut, - Bucket: bucket, - ObjInfo: objInfo, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) - } + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPut, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } /// Multipart objectAPIHandlers @@ -834,24 +830,22 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.Write(encodedSuccessResponse) w.(http.Flusher).Flush() - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Fetch object info for notifications. - objInfo, err := objectAPI.GetObjectInfo(bucket, object) - if err != nil { - errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) - return - } - - // Notify object created event. - eventNotify(eventData{ - Type: ObjectCreatedCompleteMultipartUpload, - Bucket: bucket, - ObjInfo: objInfo, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) + // Fetch object info for notifications. + objInfo, err := objectAPI.GetObjectInfo(bucket, object) + if err != nil { + errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) + return } + + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedCompleteMultipartUpload, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } /// Delete objectAPIHandlers @@ -895,17 +889,15 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } writeSuccessNoContent(w) - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Notify object deleted event. - eventNotify(eventData{ - Type: ObjectRemovedDelete, - Bucket: bucket, - ObjInfo: ObjectInfo{ - Name: object, - }, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) - } + // Notify object deleted event. + eventNotify(eventData{ + Type: ObjectRemovedDelete, + Bucket: bucket, + ObjInfo: ObjectInfo{ + Name: object, + }, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 917153013..3a8887de7 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -428,17 +428,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - if globalEventNotifier.IsBucketNotificationSet(bucket) { - // Notify object created event. - eventNotify(eventData{ - Type: ObjectCreatedPut, - Bucket: bucket, - ObjInfo: objInfo, - ReqParams: map[string]string{ - "sourceIPAddress": r.RemoteAddr, - }, - }) - } + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPut, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } // Download - file download handler.