From e86dfcf41ec3b0bb46436db706fe8e17d5cb38d0 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 15 Aug 2016 20:56:43 -0700 Subject: [PATCH] api: Change listen bucket notification to be TopicConfiguration. (#2447) --- bucket-notification-datatypes.go | 8 ++-- bucket-notification-handlers.go | 34 ++++--------- bucket-notification-utils.go | 80 ++++++++++++++++++------------- bucket-notification-utils_test.go | 74 ++++++++++++++-------------- event-notifier.go | 42 ++++++++-------- notifiers.go | 10 ++-- server_test.go | 8 ++-- 7 files changed, 128 insertions(+), 128 deletions(-) diff --git a/bucket-notification-datatypes.go b/bucket-notification-datatypes.go index c8cc3e706..d2a6b92a8 100644 --- a/bucket-notification-datatypes.go +++ b/bucket-notification-datatypes.go @@ -155,15 +155,15 @@ type NotificationEvent struct { S3 eventMeta `json:"s3"` } -// Represents the minio lambda type and account id's. -type arnLambda struct { +// Represents the minio topic type and account id's. +type arnTopic struct { Type string AccountID string } // Stringer for constructing AWS ARN compatible string. -func (m arnLambda) String() string { - return minioLambda + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type +func (m arnTopic) String() string { + return minioTopic + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type } // Represents the minio sqs type and account id's. diff --git a/bucket-notification-handlers.go b/bucket-notification-handlers.go index 9e17fbede..503668307 100644 --- a/bucket-notification-handlers.go +++ b/bucket-notification-handlers.go @@ -23,7 +23,6 @@ import ( "io" "net/http" "path" - "strings" "time" "github.com/gorilla/mux" @@ -203,22 +202,6 @@ func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []Notifi } } -// Returns true if the queueARN is for an Minio queue. -func isMinL(lambdaARN arnLambda) bool { - return strings.HasSuffix(lambdaARN.Type, lambdaTypeMinio) -} - -// isMinioARNConfigured - verifies if one lambda ARN is valid and is enabled. -func isMinioARNConfigured(lambdaARN string, lambdaConfigs []lambdaConfig) bool { - for _, lambdaConfig := range lambdaConfigs { - // Validate if lambda ARN is already enabled. - if lambdaARN == lambdaConfig.LambdaARN { - return true - } - } - return false -} - // ListenBucketNotificationHandler - list bucket notifications. func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { // Validate request authorization. @@ -230,8 +213,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit bucket := vars["bucket"] // Get notification ARN. - lambdaARN := r.URL.Query().Get("notificationARN") - if lambdaARN == "" { + topicARN := r.URL.Query().Get("notificationARN") + if topicARN == "" { writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) return } @@ -250,8 +233,9 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit return } - // Notifications set, but do not have MINIO queue enabled, return. - if !isMinioARNConfigured(lambdaARN, notificationCfg.LambdaConfigs) { + // Set SNS notifications only if special "listen" sns is set in bucket + // notification configs. + if !isMinioSNSConfigured(topicARN, notificationCfg.TopicConfigs) { writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) return } @@ -264,10 +248,10 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit // Close the listener channel. defer close(nEventCh) - // Set lambda target. - eventN.SetLambdaTarget(lambdaARN, nEventCh) - // Remove lambda listener after the writer has closed or the client disconnected. - defer eventN.RemoveLambdaTarget(lambdaARN, nEventCh) + // Set sns target. + eventN.SetSNSTarget(topicARN, nEventCh) + // Remove sns listener after the writer has closed or the client disconnected. + defer eventN.RemoveSNSTarget(topicARN, nEventCh) // Start sending bucket notifications. sendBucketNotification(w, nEventCh) diff --git a/bucket-notification-utils.go b/bucket-notification-utils.go index 24356ad3e..e1301933d 100644 --- a/bucket-notification-utils.go +++ b/bucket-notification-utils.go @@ -111,17 +111,33 @@ func checkQueueARN(queueARN string) APIErrorCode { return ErrNone } -// checkLambdaARN - check if the lambda arn is valid. -func checkLambdaARN(lambdaARN string) APIErrorCode { - if !strings.HasPrefix(lambdaARN, minioLambda) { +// checkTopicARN - check if the topic arn is valid. +func checkTopicARN(topicARN string) APIErrorCode { + if !strings.HasPrefix(topicARN, minioTopic) { return ErrARNNotification } - if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") { + if !strings.HasPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") { return ErrRegionNotification } return ErrNone } +// Returns true if the topicARN is for an Minio sns listen type. +func isMinioSNS(topicARN arnTopic) bool { + return strings.HasSuffix(topicARN.Type, snsTypeMinio) +} + +// isMinioSNSConfigured - verifies if one topic ARN is valid and is enabled. +func isMinioSNSConfigured(topicARN string, topicConfigs []topicConfig) bool { + for _, topicConfig := range topicConfigs { + // Validate if topic ARN is already enabled. + if topicARN == topicConfig.TopicARN { + return true + } + } + return false +} + // Validate if we recognize the queue type. func isValidQueue(sqsARN arnSQS) bool { amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?. @@ -130,9 +146,9 @@ func isValidQueue(sqsARN arnSQS) bool { return amqpQ || elasticQ || redisQ } -// Validate if we recognize the lambda type. -func isValidLambda(lambdaARN arnLambda) bool { - return isMinL(lambdaARN) // Is minio lambda?. +// Validate if we recognize the topic type. +func isValidTopic(topicARN arnTopic) bool { + return isMinioSNS(topicARN) // Is minio topic?. } // Validates account id for input queue ARN. @@ -188,26 +204,26 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode { } // Check - validates queue configuration and returns error if any. -func checkLambdaConfig(lConfig lambdaConfig) APIErrorCode { +func checkTopicConfig(tConfig topicConfig) APIErrorCode { // Check queue arn is valid. - if s3Error := checkLambdaARN(lConfig.LambdaARN); s3Error != ErrNone { + if s3Error := checkTopicARN(tConfig.TopicARN); s3Error != ErrNone { return s3Error } // Unmarshals QueueARN into structured object. - lambdaARN := unmarshalLambdaARN(lConfig.LambdaARN) - // Validate if lambdaARN requested any of the known supported queues. - if !isValidLambda(lambdaARN) { + topicARN := unmarshalTopicARN(tConfig.TopicARN) + // Validate if topicARN requested any of the known supported queues. + if !isValidTopic(topicARN) { return ErrARNNotification } // Check if valid events are set in queue config. - if s3Error := checkEvents(lConfig.Events); s3Error != ErrNone { + if s3Error := checkEvents(tConfig.Events); s3Error != ErrNone { return s3Error } // Check if valid filters are set in queue config. - if s3Error := checkFilterRules(lConfig.Filter.Key.FilterRules); s3Error != ErrNone { + if s3Error := checkFilterRules(tConfig.Filter.Key.FilterRules); s3Error != ErrNone { return s3Error } @@ -228,12 +244,12 @@ func validateQueueConfigs(queueConfigs []queueConfig) APIErrorCode { return ErrNone } -// Validates all incoming lambda configs, checkLambdaConfig validates if the +// Validates all incoming topic configs, checkTopicConfig validates if the // input fields for each queues is not malformed and has valid configuration // information. If validation fails bucket notifications are not enabled. -func validateLambdaConfigs(lambdaConfigs []lambdaConfig) APIErrorCode { - for _, lConfig := range lambdaConfigs { - if s3Error := checkLambdaConfig(lConfig); s3Error != ErrNone { +func validateTopicConfigs(topicConfigs []topicConfig) APIErrorCode { + for _, tConfig := range topicConfigs { + if s3Error := checkTopicConfig(tConfig); s3Error != ErrNone { return s3Error } } @@ -248,7 +264,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone { return s3Error } - if s3Error := validateLambdaConfigs(nConfig.LambdaConfigs); s3Error != ErrNone { + if s3Error := validateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone { return s3Error } @@ -256,21 +272,21 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { return ErrNone } -// Unmarshals input value of AWS ARN format into minioLambda object. -// Returned value represents minio lambda type, currently supported are -// - minio -func unmarshalLambdaARN(lambdaARN string) arnLambda { - lambda := arnLambda{} - if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") { - return lambda +// Unmarshals input value of AWS ARN format into minioTopic object. +// Returned value represents minio topic type, currently supported are +// - listen +func unmarshalTopicARN(topicARN string) arnTopic { + topic := arnTopic{} + if !strings.HasPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") { + return topic } - lambdaType := strings.TrimPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") + topicType := strings.TrimPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") switch { - case strings.HasSuffix(lambdaType, lambdaTypeMinio): - lambda.Type = lambdaTypeMinio - } // Add more lambda here. - lambda.AccountID = strings.TrimSuffix(lambdaType, ":"+lambda.Type) - return lambda + case strings.HasSuffix(topicType, snsTypeMinio): + topic.Type = snsTypeMinio + } // Add more topic here. + topic.AccountID = strings.TrimSuffix(topicType, ":"+topic.Type) + return topic } // Unmarshals input value of AWS ARN format into minioSqs object. diff --git a/bucket-notification-utils_test.go b/bucket-notification-utils_test.go index d3e229a47..43e1a49ad 100644 --- a/bucket-notification-utils_test.go +++ b/bucket-notification-utils_test.go @@ -97,8 +97,8 @@ func TestValidEvents(t *testing.T) { } } -// Tests lambda arn validation. -func TestLambdaARN(t *testing.T) { +// Tests topic arn validation. +func TestTopicARN(t *testing.T) { rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("unable initialize config file, %s", err) @@ -106,33 +106,33 @@ func TestLambdaARN(t *testing.T) { defer removeAll(rootPath) testCases := []struct { - lambdaARN string - errCode APIErrorCode + topicARN string + errCode APIErrorCode }{ - // Valid minio lambda with '1' account id. + // Valid minio topic with '1' account id. { - lambdaARN: "arn:minio:lambda:us-east-1:1:minio", - errCode: ErrNone, + topicARN: "arn:minio:sns:us-east-1:1:minio", + errCode: ErrNone, }, - // Valid minio lambda with '10' account id. + // Valid minio topic with '10' account id. { - lambdaARN: "arn:minio:lambda:us-east-1:10:minio", - errCode: ErrNone, + topicARN: "arn:minio:sns:us-east-1:10:minio", + errCode: ErrNone, }, // Invalid empty queue arn. { - lambdaARN: "", - errCode: ErrARNNotification, + topicARN: "", + errCode: ErrARNNotification, }, // Invalid region 'us-west-1' in queue arn. { - lambdaARN: "arn:minio:lambda:us-west-1:1:redis", - errCode: ErrRegionNotification, + topicARN: "arn:minio:sns:us-west-1:1:redis", + errCode: ErrRegionNotification, }, } for i, testCase := range testCases { - errCode := checkLambdaARN(testCase.lambdaARN) + errCode := checkTopicARN(testCase.topicARN) if testCase.errCode != errCode { t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode) } @@ -186,8 +186,8 @@ func TestQueueARN(t *testing.T) { } } -// Test unmarshal queue arn. -func TestUnmarshalLambdaARN(t *testing.T) { +// Test unmarshal topic arn. +func TestUnmarshalTopicARN(t *testing.T) { rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("unable initialize config file, %s", err) @@ -195,40 +195,40 @@ func TestUnmarshalLambdaARN(t *testing.T) { defer removeAll(rootPath) testCases := []struct { - lambdaARN string - Type string + topicARN string + Type string }{ - // Valid minio lambda arn. + // Valid minio topic arn. { - lambdaARN: "arn:minio:lambda:us-east-1:1:lambda", - Type: "lambda", + topicARN: "arn:minio:sns:us-east-1:1:listen", + Type: "listen", }, - // Invalid empty queue arn. + // Invalid empty topic arn. { - lambdaARN: "", - Type: "", + topicARN: "", + Type: "", }, - // Invalid region 'us-west-1' in queue arn. + // Invalid region 'us-west-1' in topic arn. { - lambdaARN: "arn:minio:lambda:us-west-1:1:lambda", - Type: "", + topicARN: "arn:minio:sns:us-west-1:1:listen", + Type: "", }, - // Partial queue arn. + // Partial topic arn. { - lambdaARN: "arn:minio:lambda:", - Type: "", + topicARN: "arn:minio:sns:", + Type: "", }, - // Invalid queue service value. + // Invalid topic service value. { - lambdaARN: "arn:minio:lambda:us-east-1:1:*", - Type: "", + topicARN: "arn:minio:sns:us-east-1:1:*", + Type: "", }, } for i, testCase := range testCases { - lambda := unmarshalLambdaARN(testCase.lambdaARN) - if testCase.Type != lambda.Type { - t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, lambda.Type) + topic := unmarshalTopicARN(testCase.topicARN) + if testCase.Type != topic.Type { + t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, topic.Type) } } } diff --git a/event-notifier.go b/event-notifier.go index 27c18a45a..27a2e46d3 100644 --- a/event-notifier.go +++ b/event-notifier.go @@ -35,7 +35,7 @@ type eventNotifier struct { // Collection of 'bucket' and notification config. notificationConfigs map[string]*notificationConfig - lambdaTargets map[string][]chan []NotificationEvent + snsTargets map[string][]chan []NotificationEvent queueTargets map[string]*logrus.Logger } @@ -101,37 +101,37 @@ func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger { return en.queueTargets[queueARN] } -func (en eventNotifier) GetLambdaTarget(lambdaARN string) []chan []NotificationEvent { +func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent { en.rwMutex.RLock() defer en.rwMutex.RUnlock() - return en.lambdaTargets[lambdaARN] + return en.snsTargets[snsARN] } -// Set a new lambda target for an input lambda ARN. -func (en *eventNotifier) SetLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) error { +// Set a new sns target for an input sns ARN. +func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error { en.rwMutex.Lock() defer en.rwMutex.Unlock() if listenerCh == nil { return errors.New("invalid argument") } - en.lambdaTargets[lambdaARN] = append(en.lambdaTargets[lambdaARN], listenerCh) + en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh) return nil } -// Remove lambda target for an input lambda ARN. -func (en *eventNotifier) RemoveLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) { +// Remove sns target for an input sns ARN. +func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) { en.rwMutex.Lock() defer en.rwMutex.Unlock() - lambdaTarget, ok := en.lambdaTargets[lambdaARN] + snsTarget, ok := en.snsTargets[snsARN] if ok { - for i, savedListenerCh := range lambdaTarget { + for i, savedListenerCh := range snsTarget { if listenerCh == savedListenerCh { - lambdaTarget = append(lambdaTarget[:i], lambdaTarget[i+1:]...) - if len(lambdaTarget) == 0 { - delete(en.lambdaTargets, lambdaARN) + snsTarget = append(snsTarget[:i], snsTarget[i+1:]...) + if len(snsTarget) == 0 { + delete(en.snsTargets, snsARN) break } - en.lambdaTargets[lambdaARN] = lambdaTarget + en.snsTargets[snsARN] = snsTarget } } } @@ -180,7 +180,7 @@ func eventNotify(event eventData) { nConfig := eventN.GetBucketNotificationConfig(event.Bucket) // No bucket notifications enabled, drop the event notification. - if len(nConfig.QueueConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { + if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { return } @@ -206,12 +206,12 @@ func eventNotify(event eventData) { } } } - // Validate if the event and object match the lambda configs. - for _, lambdaConfig := range nConfig.LambdaConfigs { - ruleMatch := filterRuleMatch(objectName, lambdaConfig.Filter.Key.FilterRules) - eventMatch := eventMatch(eventType, lambdaConfig.Events) + // Validate if the event and object match the sns configs. + for _, topicConfig := range nConfig.TopicConfigs { + ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules) + eventMatch := eventMatch(eventType, topicConfig.Events) if eventMatch && ruleMatch { - targetListeners := eventN.GetLambdaTarget(lambdaConfig.LambdaARN) + targetListeners := eventN.GetSNSTarget(topicConfig.TopicARN) for _, listener := range targetListeners { listener <- notificationEvent } @@ -377,7 +377,7 @@ func initEventNotifier(objAPI ObjectLayer) error { rwMutex: &sync.RWMutex{}, notificationConfigs: configs, queueTargets: queueTargets, - lambdaTargets: make(map[string][]chan []NotificationEvent), + snsTargets: make(map[string][]chan []NotificationEvent), } return nil diff --git a/notifiers.go b/notifiers.go index 4e0ca4ea8..91a804dde 100644 --- a/notifiers.go +++ b/notifiers.go @@ -36,13 +36,13 @@ const ( queueTypeRedis = "redis" ) -// Lambda type. +// Topic type. const ( - // Minio lambda ARN prefix. - minioLambda = "arn:minio:lambda:" + // Minio topic ARN prefix. + minioTopic = "arn:minio:sns:" - // Static string indicating lambda type 'lambda'. - lambdaTypeMinio = "lambda" + // Static string indicating sns type 'listen'. + snsTypeMinio = "listen" ) var errNotifyNotEnabled = errors.New("requested notifier not enabled") diff --git a/server_test.go b/server_test.go index e4a4ffa22..76a6a61ff 100644 --- a/server_test.go +++ b/server_test.go @@ -79,7 +79,7 @@ func (s *TestSuiteCommon) TestAuth(c *C) { // verifies it by fetching the notification back. func (s *TestSuiteCommon) TestBucketNotification(c *C) { // Sample bucket notification - bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-east-1:444455556666:lambda` + bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` // generate a random bucket Name. bucketName := getRandomBucketName() @@ -121,7 +121,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) { // Verify if downloaded policy matches with previousy uploaded. c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true) - invalidBucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-east-1:444455556666:minio` + invalidBucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:minio` request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) @@ -134,7 +134,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) { verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) - invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-west-1:444455556666:lambda` + invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-west-1:444455556666:listen` request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) c.Assert(err, IsNil) @@ -146,7 +146,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) { verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest) - invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:lambda:us-east-1:444455556666:lambda` + invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:sns:us-east-1:444455556666:listen` request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) c.Assert(err, IsNil)