diff --git a/bucket-notification-datatypes.go b/bucket-notification-datatypes.go
index c8cc3e706..d2a6b92a8 100644
--- a/bucket-notification-datatypes.go
+++ b/bucket-notification-datatypes.go
@@ -155,15 +155,15 @@ type NotificationEvent struct {
S3 eventMeta `json:"s3"`
}
-// Represents the minio lambda type and account id's.
-type arnLambda struct {
+// Represents the minio topic type and account id's.
+type arnTopic struct {
Type string
AccountID string
}
// Stringer for constructing AWS ARN compatible string.
-func (m arnLambda) String() string {
- return minioLambda + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type
+func (m arnTopic) String() string {
+ return minioTopic + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type
}
// Represents the minio sqs type and account id's.
diff --git a/bucket-notification-handlers.go b/bucket-notification-handlers.go
index 9e17fbede..503668307 100644
--- a/bucket-notification-handlers.go
+++ b/bucket-notification-handlers.go
@@ -23,7 +23,6 @@ import (
"io"
"net/http"
"path"
- "strings"
"time"
"github.com/gorilla/mux"
@@ -203,22 +202,6 @@ func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []Notifi
}
}
-// Returns true if the queueARN is for an Minio queue.
-func isMinL(lambdaARN arnLambda) bool {
- return strings.HasSuffix(lambdaARN.Type, lambdaTypeMinio)
-}
-
-// isMinioARNConfigured - verifies if one lambda ARN is valid and is enabled.
-func isMinioARNConfigured(lambdaARN string, lambdaConfigs []lambdaConfig) bool {
- for _, lambdaConfig := range lambdaConfigs {
- // Validate if lambda ARN is already enabled.
- if lambdaARN == lambdaConfig.LambdaARN {
- return true
- }
- }
- return false
-}
-
// ListenBucketNotificationHandler - list bucket notifications.
func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
// Validate request authorization.
@@ -230,8 +213,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
bucket := vars["bucket"]
// Get notification ARN.
- lambdaARN := r.URL.Query().Get("notificationARN")
- if lambdaARN == "" {
+ topicARN := r.URL.Query().Get("notificationARN")
+ if topicARN == "" {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
@@ -250,8 +233,9 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
return
}
- // Notifications set, but do not have MINIO queue enabled, return.
- if !isMinioARNConfigured(lambdaARN, notificationCfg.LambdaConfigs) {
+ // Set SNS notifications only if special "listen" sns is set in bucket
+ // notification configs.
+ if !isMinioSNSConfigured(topicARN, notificationCfg.TopicConfigs) {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
@@ -264,10 +248,10 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
// Close the listener channel.
defer close(nEventCh)
- // Set lambda target.
- eventN.SetLambdaTarget(lambdaARN, nEventCh)
- // Remove lambda listener after the writer has closed or the client disconnected.
- defer eventN.RemoveLambdaTarget(lambdaARN, nEventCh)
+ // Set sns target.
+ eventN.SetSNSTarget(topicARN, nEventCh)
+ // Remove sns listener after the writer has closed or the client disconnected.
+ defer eventN.RemoveSNSTarget(topicARN, nEventCh)
// Start sending bucket notifications.
sendBucketNotification(w, nEventCh)
diff --git a/bucket-notification-utils.go b/bucket-notification-utils.go
index 24356ad3e..e1301933d 100644
--- a/bucket-notification-utils.go
+++ b/bucket-notification-utils.go
@@ -111,17 +111,33 @@ func checkQueueARN(queueARN string) APIErrorCode {
return ErrNone
}
-// checkLambdaARN - check if the lambda arn is valid.
-func checkLambdaARN(lambdaARN string) APIErrorCode {
- if !strings.HasPrefix(lambdaARN, minioLambda) {
+// checkTopicARN - check if the topic arn is valid.
+func checkTopicARN(topicARN string) APIErrorCode {
+ if !strings.HasPrefix(topicARN, minioTopic) {
return ErrARNNotification
}
- if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") {
+ if !strings.HasPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") {
return ErrRegionNotification
}
return ErrNone
}
+// Returns true if the topicARN is for an Minio sns listen type.
+func isMinioSNS(topicARN arnTopic) bool {
+ return strings.HasSuffix(topicARN.Type, snsTypeMinio)
+}
+
+// isMinioSNSConfigured - verifies if one topic ARN is valid and is enabled.
+func isMinioSNSConfigured(topicARN string, topicConfigs []topicConfig) bool {
+ for _, topicConfig := range topicConfigs {
+ // Validate if topic ARN is already enabled.
+ if topicARN == topicConfig.TopicARN {
+ return true
+ }
+ }
+ return false
+}
+
// Validate if we recognize the queue type.
func isValidQueue(sqsARN arnSQS) bool {
amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?.
@@ -130,9 +146,9 @@ func isValidQueue(sqsARN arnSQS) bool {
return amqpQ || elasticQ || redisQ
}
-// Validate if we recognize the lambda type.
-func isValidLambda(lambdaARN arnLambda) bool {
- return isMinL(lambdaARN) // Is minio lambda?.
+// Validate if we recognize the topic type.
+func isValidTopic(topicARN arnTopic) bool {
+ return isMinioSNS(topicARN) // Is minio topic?.
}
// Validates account id for input queue ARN.
@@ -188,26 +204,26 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode {
}
// Check - validates queue configuration and returns error if any.
-func checkLambdaConfig(lConfig lambdaConfig) APIErrorCode {
+func checkTopicConfig(tConfig topicConfig) APIErrorCode {
// Check queue arn is valid.
- if s3Error := checkLambdaARN(lConfig.LambdaARN); s3Error != ErrNone {
+ if s3Error := checkTopicARN(tConfig.TopicARN); s3Error != ErrNone {
return s3Error
}
// Unmarshals QueueARN into structured object.
- lambdaARN := unmarshalLambdaARN(lConfig.LambdaARN)
- // Validate if lambdaARN requested any of the known supported queues.
- if !isValidLambda(lambdaARN) {
+ topicARN := unmarshalTopicARN(tConfig.TopicARN)
+ // Validate if topicARN requested any of the known supported queues.
+ if !isValidTopic(topicARN) {
return ErrARNNotification
}
// Check if valid events are set in queue config.
- if s3Error := checkEvents(lConfig.Events); s3Error != ErrNone {
+ if s3Error := checkEvents(tConfig.Events); s3Error != ErrNone {
return s3Error
}
// Check if valid filters are set in queue config.
- if s3Error := checkFilterRules(lConfig.Filter.Key.FilterRules); s3Error != ErrNone {
+ if s3Error := checkFilterRules(tConfig.Filter.Key.FilterRules); s3Error != ErrNone {
return s3Error
}
@@ -228,12 +244,12 @@ func validateQueueConfigs(queueConfigs []queueConfig) APIErrorCode {
return ErrNone
}
-// Validates all incoming lambda configs, checkLambdaConfig validates if the
+// Validates all incoming topic configs, checkTopicConfig validates if the
// input fields for each queues is not malformed and has valid configuration
// information. If validation fails bucket notifications are not enabled.
-func validateLambdaConfigs(lambdaConfigs []lambdaConfig) APIErrorCode {
- for _, lConfig := range lambdaConfigs {
- if s3Error := checkLambdaConfig(lConfig); s3Error != ErrNone {
+func validateTopicConfigs(topicConfigs []topicConfig) APIErrorCode {
+ for _, tConfig := range topicConfigs {
+ if s3Error := checkTopicConfig(tConfig); s3Error != ErrNone {
return s3Error
}
}
@@ -248,7 +264,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone {
return s3Error
}
- if s3Error := validateLambdaConfigs(nConfig.LambdaConfigs); s3Error != ErrNone {
+ if s3Error := validateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone {
return s3Error
}
@@ -256,21 +272,21 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
return ErrNone
}
-// Unmarshals input value of AWS ARN format into minioLambda object.
-// Returned value represents minio lambda type, currently supported are
-// - minio
-func unmarshalLambdaARN(lambdaARN string) arnLambda {
- lambda := arnLambda{}
- if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") {
- return lambda
+// Unmarshals input value of AWS ARN format into minioTopic object.
+// Returned value represents minio topic type, currently supported are
+// - listen
+func unmarshalTopicARN(topicARN string) arnTopic {
+ topic := arnTopic{}
+ if !strings.HasPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") {
+ return topic
}
- lambdaType := strings.TrimPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":")
+ topicType := strings.TrimPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":")
switch {
- case strings.HasSuffix(lambdaType, lambdaTypeMinio):
- lambda.Type = lambdaTypeMinio
- } // Add more lambda here.
- lambda.AccountID = strings.TrimSuffix(lambdaType, ":"+lambda.Type)
- return lambda
+ case strings.HasSuffix(topicType, snsTypeMinio):
+ topic.Type = snsTypeMinio
+ } // Add more topic here.
+ topic.AccountID = strings.TrimSuffix(topicType, ":"+topic.Type)
+ return topic
}
// Unmarshals input value of AWS ARN format into minioSqs object.
diff --git a/bucket-notification-utils_test.go b/bucket-notification-utils_test.go
index d3e229a47..43e1a49ad 100644
--- a/bucket-notification-utils_test.go
+++ b/bucket-notification-utils_test.go
@@ -97,8 +97,8 @@ func TestValidEvents(t *testing.T) {
}
}
-// Tests lambda arn validation.
-func TestLambdaARN(t *testing.T) {
+// Tests topic arn validation.
+func TestTopicARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
@@ -106,33 +106,33 @@ func TestLambdaARN(t *testing.T) {
defer removeAll(rootPath)
testCases := []struct {
- lambdaARN string
- errCode APIErrorCode
+ topicARN string
+ errCode APIErrorCode
}{
- // Valid minio lambda with '1' account id.
+ // Valid minio topic with '1' account id.
{
- lambdaARN: "arn:minio:lambda:us-east-1:1:minio",
- errCode: ErrNone,
+ topicARN: "arn:minio:sns:us-east-1:1:minio",
+ errCode: ErrNone,
},
- // Valid minio lambda with '10' account id.
+ // Valid minio topic with '10' account id.
{
- lambdaARN: "arn:minio:lambda:us-east-1:10:minio",
- errCode: ErrNone,
+ topicARN: "arn:minio:sns:us-east-1:10:minio",
+ errCode: ErrNone,
},
// Invalid empty queue arn.
{
- lambdaARN: "",
- errCode: ErrARNNotification,
+ topicARN: "",
+ errCode: ErrARNNotification,
},
// Invalid region 'us-west-1' in queue arn.
{
- lambdaARN: "arn:minio:lambda:us-west-1:1:redis",
- errCode: ErrRegionNotification,
+ topicARN: "arn:minio:sns:us-west-1:1:redis",
+ errCode: ErrRegionNotification,
},
}
for i, testCase := range testCases {
- errCode := checkLambdaARN(testCase.lambdaARN)
+ errCode := checkTopicARN(testCase.topicARN)
if testCase.errCode != errCode {
t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode)
}
@@ -186,8 +186,8 @@ func TestQueueARN(t *testing.T) {
}
}
-// Test unmarshal queue arn.
-func TestUnmarshalLambdaARN(t *testing.T) {
+// Test unmarshal topic arn.
+func TestUnmarshalTopicARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
@@ -195,40 +195,40 @@ func TestUnmarshalLambdaARN(t *testing.T) {
defer removeAll(rootPath)
testCases := []struct {
- lambdaARN string
- Type string
+ topicARN string
+ Type string
}{
- // Valid minio lambda arn.
+ // Valid minio topic arn.
{
- lambdaARN: "arn:minio:lambda:us-east-1:1:lambda",
- Type: "lambda",
+ topicARN: "arn:minio:sns:us-east-1:1:listen",
+ Type: "listen",
},
- // Invalid empty queue arn.
+ // Invalid empty topic arn.
{
- lambdaARN: "",
- Type: "",
+ topicARN: "",
+ Type: "",
},
- // Invalid region 'us-west-1' in queue arn.
+ // Invalid region 'us-west-1' in topic arn.
{
- lambdaARN: "arn:minio:lambda:us-west-1:1:lambda",
- Type: "",
+ topicARN: "arn:minio:sns:us-west-1:1:listen",
+ Type: "",
},
- // Partial queue arn.
+ // Partial topic arn.
{
- lambdaARN: "arn:minio:lambda:",
- Type: "",
+ topicARN: "arn:minio:sns:",
+ Type: "",
},
- // Invalid queue service value.
+ // Invalid topic service value.
{
- lambdaARN: "arn:minio:lambda:us-east-1:1:*",
- Type: "",
+ topicARN: "arn:minio:sns:us-east-1:1:*",
+ Type: "",
},
}
for i, testCase := range testCases {
- lambda := unmarshalLambdaARN(testCase.lambdaARN)
- if testCase.Type != lambda.Type {
- t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, lambda.Type)
+ topic := unmarshalTopicARN(testCase.topicARN)
+ if testCase.Type != topic.Type {
+ t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, topic.Type)
}
}
}
diff --git a/event-notifier.go b/event-notifier.go
index 27c18a45a..27a2e46d3 100644
--- a/event-notifier.go
+++ b/event-notifier.go
@@ -35,7 +35,7 @@ type eventNotifier struct {
// Collection of 'bucket' and notification config.
notificationConfigs map[string]*notificationConfig
- lambdaTargets map[string][]chan []NotificationEvent
+ snsTargets map[string][]chan []NotificationEvent
queueTargets map[string]*logrus.Logger
}
@@ -101,37 +101,37 @@ func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger {
return en.queueTargets[queueARN]
}
-func (en eventNotifier) GetLambdaTarget(lambdaARN string) []chan []NotificationEvent {
+func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
- return en.lambdaTargets[lambdaARN]
+ return en.snsTargets[snsARN]
}
-// Set a new lambda target for an input lambda ARN.
-func (en *eventNotifier) SetLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) error {
+// Set a new sns target for an input sns ARN.
+func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if listenerCh == nil {
return errors.New("invalid argument")
}
- en.lambdaTargets[lambdaARN] = append(en.lambdaTargets[lambdaARN], listenerCh)
+ en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh)
return nil
}
-// Remove lambda target for an input lambda ARN.
-func (en *eventNotifier) RemoveLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) {
+// Remove sns target for an input sns ARN.
+func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
- lambdaTarget, ok := en.lambdaTargets[lambdaARN]
+ snsTarget, ok := en.snsTargets[snsARN]
if ok {
- for i, savedListenerCh := range lambdaTarget {
+ for i, savedListenerCh := range snsTarget {
if listenerCh == savedListenerCh {
- lambdaTarget = append(lambdaTarget[:i], lambdaTarget[i+1:]...)
- if len(lambdaTarget) == 0 {
- delete(en.lambdaTargets, lambdaARN)
+ snsTarget = append(snsTarget[:i], snsTarget[i+1:]...)
+ if len(snsTarget) == 0 {
+ delete(en.snsTargets, snsARN)
break
}
- en.lambdaTargets[lambdaARN] = lambdaTarget
+ en.snsTargets[snsARN] = snsTarget
}
}
}
@@ -180,7 +180,7 @@ func eventNotify(event eventData) {
nConfig := eventN.GetBucketNotificationConfig(event.Bucket)
// No bucket notifications enabled, drop the event notification.
- if len(nConfig.QueueConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
+ if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
return
}
@@ -206,12 +206,12 @@ func eventNotify(event eventData) {
}
}
}
- // Validate if the event and object match the lambda configs.
- for _, lambdaConfig := range nConfig.LambdaConfigs {
- ruleMatch := filterRuleMatch(objectName, lambdaConfig.Filter.Key.FilterRules)
- eventMatch := eventMatch(eventType, lambdaConfig.Events)
+ // Validate if the event and object match the sns configs.
+ for _, topicConfig := range nConfig.TopicConfigs {
+ ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules)
+ eventMatch := eventMatch(eventType, topicConfig.Events)
if eventMatch && ruleMatch {
- targetListeners := eventN.GetLambdaTarget(lambdaConfig.LambdaARN)
+ targetListeners := eventN.GetSNSTarget(topicConfig.TopicARN)
for _, listener := range targetListeners {
listener <- notificationEvent
}
@@ -377,7 +377,7 @@ func initEventNotifier(objAPI ObjectLayer) error {
rwMutex: &sync.RWMutex{},
notificationConfigs: configs,
queueTargets: queueTargets,
- lambdaTargets: make(map[string][]chan []NotificationEvent),
+ snsTargets: make(map[string][]chan []NotificationEvent),
}
return nil
diff --git a/notifiers.go b/notifiers.go
index 4e0ca4ea8..91a804dde 100644
--- a/notifiers.go
+++ b/notifiers.go
@@ -36,13 +36,13 @@ const (
queueTypeRedis = "redis"
)
-// Lambda type.
+// Topic type.
const (
- // Minio lambda ARN prefix.
- minioLambda = "arn:minio:lambda:"
+ // Minio topic ARN prefix.
+ minioTopic = "arn:minio:sns:"
- // Static string indicating lambda type 'lambda'.
- lambdaTypeMinio = "lambda"
+ // Static string indicating sns type 'listen'.
+ snsTypeMinio = "listen"
)
var errNotifyNotEnabled = errors.New("requested notifier not enabled")
diff --git a/server_test.go b/server_test.go
index e4a4ffa22..76a6a61ff 100644
--- a/server_test.go
+++ b/server_test.go
@@ -79,7 +79,7 @@ func (s *TestSuiteCommon) TestAuth(c *C) {
// verifies it by fetching the notification back.
func (s *TestSuiteCommon) TestBucketNotification(c *C) {
// Sample bucket notification
- bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-east-1:444455556666:lambda`
+ bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
// generate a random bucket Name.
bucketName := getRandomBucketName()
@@ -121,7 +121,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) {
// Verify if downloaded policy matches with previousy uploaded.
c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true)
- invalidBucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-east-1:444455556666:minio`
+ invalidBucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:minio`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
@@ -134,7 +134,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) {
verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-west-1:444455556666:lambda`
+ invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-west-1:444455556666:listen`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
@@ -146,7 +146,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) {
verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest)
- invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:lambda:us-east-1:444455556666:lambda`
+ invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)