Make addition of TopicConfig to globalEventNotifier go-routine safe (#2806)

master
Krishnan Parthasarathi 8 years ago committed by Harshavardhana
parent f72163f856
commit 5fdd768903
  1. 46
      cmd/bucket-handlers.go
  2. 51
      cmd/bucket-notification-handlers.go
  3. 28
      cmd/event-notifier.go
  4. 71
      cmd/event-notifier_test.go
  5. 96
      cmd/object-handlers.go
  6. 20
      cmd/web-handlers.go

@ -331,20 +331,18 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
// Write success response. // Write success response.
writeSuccessResponse(w, encodedSuccessResponse) writeSuccessResponse(w, encodedSuccessResponse)
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify deleted event for objects.
// Notify deleted event for objects. for _, dobj := range deletedObjects {
for _, dobj := range deletedObjects { eventNotify(eventData{
eventNotify(eventData{ Type: ObjectRemovedDelete,
Type: ObjectRemovedDelete, Bucket: bucket,
Bucket: bucket, ObjInfo: ObjectInfo{
ObjInfo: ObjectInfo{ Name: dobj.ObjectName,
Name: dobj.ObjectName, },
}, ReqParams: map[string]string{
ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr,
"sourceIPAddress": r.RemoteAddr, },
}, })
})
}
} }
} }
@ -453,17 +451,15 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
// Write successful response. // Write successful response.
writeSuccessNoContent(w) writeSuccessNoContent(w)
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object created event.
// Notify object created event. eventNotify(eventData{
eventNotify(eventData{ Type: ObjectCreatedPost,
Type: ObjectCreatedPost, Bucket: bucket,
Bucket: bucket, ObjInfo: objInfo,
ObjInfo: objInfo, ReqParams: map[string]string{
ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr,
"sourceIPAddress": r.RemoteAddr, },
}, })
})
}
} }
// HeadBucketHandler - HEAD Bucket // HeadBucketHandler - HEAD Bucket

@ -270,47 +270,24 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
}) })
} }
// Fetch for existing notification configs and update topic configs. // Make topic configuration corresponding to this ListenBucketNotification request.
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucket) topicCfg := &topicConfig{
if nConfig == nil { TopicARN: accountARN,
// No notification configs found, initialize. serviceConfig: serviceConfig{
nConfig = &notificationConfig{ Events: events,
TopicConfigs: []topicConfig{{ Filter: struct {
TopicARN: accountARN, Key keyFilter `xml:"S3Key,omitempty"`
serviceConfig: serviceConfig{ }{
Events: events, Key: keyFilter{
Filter: struct { FilterRules: filterRules,
Key keyFilter `xml:"S3Key,omitempty"`
}{
Key: keyFilter{
FilterRules: filterRules,
},
},
ID: "sns-" + accountID,
}, },
}},
}
} else {
// Previously set notification configs found append to
// existing topic configs.
nConfig.TopicConfigs = append(nConfig.TopicConfigs, topicConfig{
TopicARN: accountARN,
serviceConfig: serviceConfig{
Events: events,
Filter: struct {
Key keyFilter `xml:"S3Key,omitempty"`
}{
Key: keyFilter{
FilterRules: filterRules,
},
},
ID: "sns-" + accountID,
}, },
}) ID: "sns-" + accountID,
},
} }
// Save bucket notification config. // Add topic config to bucket notification config.
if err = globalEventNotifier.SetBucketNotificationConfig(bucket, nConfig); err != nil { if err = globalEventNotifier.AddTopicConfig(bucket, topicCfg); err != nil {
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return return
} }

@ -137,17 +137,6 @@ func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []Notifi
} }
} }
// Returns true if bucket notification is set for the bucket, false otherwise.
func (en *eventNotifier) IsBucketNotificationSet(bucket string) bool {
if en == nil {
return false
}
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
_, ok := en.notificationConfigs[bucket]
return ok
}
// Fetch bucket notification config for an input bucket. // Fetch bucket notification config for an input bucket.
func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig { func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
en.rwMutex.RLock() en.rwMutex.RLock()
@ -167,6 +156,23 @@ func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notification
return nil return nil
} }
func (en *eventNotifier) AddTopicConfig(bucket string, topicCfg *topicConfig) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if topicCfg == nil {
return errInvalidArgument
}
notificationCfg := en.notificationConfigs[bucket]
if notificationCfg == nil {
en.notificationConfigs[bucket] = &notificationConfig{
TopicConfigs: []topicConfig{*topicCfg},
}
return nil
}
notificationCfg.TopicConfigs = append(notificationCfg.TopicConfigs, *topicCfg)
return nil
}
// eventNotify notifies an event to relevant targets based on their // eventNotify notifies an event to relevant targets based on their
// bucket notification configs. // bucket notification configs.
func eventNotify(event eventData) { func eventNotify(event eventData) {

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"fmt"
"reflect" "reflect"
"testing" "testing"
"time" "time"
@ -64,11 +65,11 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) {
t.Errorf("Expected error to be nil, got %s", err) t.Errorf("Expected error to be nil, got %s", err)
} }
if !globalEventNotifier.IsBucketNotificationSet(bucketName) { nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
if nConfig == nil {
t.Errorf("Notification expected to be set, but notification not set.") t.Errorf("Notification expected to be set, but notification not set.")
} }
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
if !reflect.DeepEqual(nConfig, &notificationConfig{}) { if !reflect.DeepEqual(nConfig, &notificationConfig{}) {
t.Errorf("Mismatching notification configs.") t.Errorf("Mismatching notification configs.")
} }
@ -381,3 +382,69 @@ func TestListenBucketNotification(t *testing.T) {
break break
} }
} }
func testAddTopicConfig(obj ObjectLayer, instanceType string, t TestErrHandler) {
root, cErr := newTestConfig("us-east-1")
if cErr != nil {
t.Fatalf("[%s] Failed to initialize test config: %v", instanceType, cErr)
}
defer removeAll(root)
if err := initEventNotifier(obj); err != nil {
t.Fatalf("[%s] : Failed to initialize event notifier: %v", instanceType, err)
}
// Make a bucket to store topicConfigs.
randBucket := getRandomBucketName()
if err := obj.MakeBucket(randBucket); err != nil {
t.Fatalf("[%s] : Failed to make bucket %s", instanceType, randBucket)
}
// Add a topicConfig to an empty notificationConfig.
accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano())
accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen"
var filterRules []filterRule
filterRules = append(filterRules, filterRule{
Name: "prefix",
Value: "minio",
})
filterRules = append(filterRules, filterRule{
Name: "suffix",
Value: "*.jpg",
})
// Make topic configuration corresponding to this ListenBucketNotification request.
sampleTopicCfg := &topicConfig{
TopicARN: accountARN,
serviceConfig: serviceConfig{
Filter: struct {
Key keyFilter `xml:"S3Key,omitempty"`
}{
Key: keyFilter{
FilterRules: filterRules,
},
},
ID: "sns-" + accountID,
},
}
testCases := []struct {
topicCfg *topicConfig
expectedErr error
}{
{sampleTopicCfg, nil},
{nil, errInvalidArgument},
{sampleTopicCfg, nil},
}
for i, test := range testCases {
err := globalEventNotifier.AddTopicConfig(randBucket, test.topicCfg)
if err != test.expectedErr {
t.Errorf("Test %d: %s failed with error %v, expected to fail with %v",
i+1, instanceType, err, test.expectedErr)
}
}
}
func TestAddTopicConfig(t *testing.T) {
ExecObjectLayerTest(t, testAddTopicConfig)
}

@ -378,17 +378,15 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// write success response. // write success response.
writeSuccessResponse(w, encodedSuccessResponse) writeSuccessResponse(w, encodedSuccessResponse)
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object created event.
// Notify object created event. eventNotify(eventData{
eventNotify(eventData{ Type: ObjectCreatedCopy,
Type: ObjectCreatedCopy, Bucket: bucket,
Bucket: bucket, ObjInfo: objInfo,
ObjInfo: objInfo, ReqParams: map[string]string{
ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr,
"sourceIPAddress": r.RemoteAddr, },
}, })
})
}
} }
// PutObjectHandler - PUT Object // PutObjectHandler - PUT Object
@ -482,17 +480,15 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"") w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
writeSuccessResponse(w, nil) writeSuccessResponse(w, nil)
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object created event.
// Notify object created event. eventNotify(eventData{
eventNotify(eventData{ Type: ObjectCreatedPut,
Type: ObjectCreatedPut, Bucket: bucket,
Bucket: bucket, ObjInfo: objInfo,
ObjInfo: objInfo, ReqParams: map[string]string{
ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr,
"sourceIPAddress": r.RemoteAddr, },
}, })
})
}
} }
/// Multipart objectAPIHandlers /// Multipart objectAPIHandlers
@ -834,24 +830,22 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
w.Write(encodedSuccessResponse) w.Write(encodedSuccessResponse)
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Fetch object info for notifications.
// Fetch object info for notifications. objInfo, err := objectAPI.GetObjectInfo(bucket, object)
objInfo, err := objectAPI.GetObjectInfo(bucket, object) if err != nil {
if err != nil { errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object)) return
return
}
// Notify object created event.
eventNotify(eventData{
Type: ObjectCreatedCompleteMultipartUpload,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
} }
// Notify object created event.
eventNotify(eventData{
Type: ObjectCreatedCompleteMultipartUpload,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
} }
/// Delete objectAPIHandlers /// Delete objectAPIHandlers
@ -895,17 +889,15 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
} }
writeSuccessNoContent(w) writeSuccessNoContent(w)
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object deleted event.
// Notify object deleted event. eventNotify(eventData{
eventNotify(eventData{ Type: ObjectRemovedDelete,
Type: ObjectRemovedDelete, Bucket: bucket,
Bucket: bucket, ObjInfo: ObjectInfo{
ObjInfo: ObjectInfo{ Name: object,
Name: object, },
}, ReqParams: map[string]string{
ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr,
"sourceIPAddress": r.RemoteAddr, },
}, })
})
}
} }

@ -428,17 +428,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return return
} }
if globalEventNotifier.IsBucketNotificationSet(bucket) { // Notify object created event.
// Notify object created event. eventNotify(eventData{
eventNotify(eventData{ Type: ObjectCreatedPut,
Type: ObjectCreatedPut, Bucket: bucket,
Bucket: bucket, ObjInfo: objInfo,
ObjInfo: objInfo, ReqParams: map[string]string{
ReqParams: map[string]string{ "sourceIPAddress": r.RemoteAddr,
"sourceIPAddress": r.RemoteAddr, },
}, })
})
}
} }
// Download - file download handler. // Download - file download handler.

Loading…
Cancel
Save