api: Change listen bucket notification to be TopicConfiguration. (#2447)

master
Harshavardhana 8 years ago committed by Anand Babu (AB) Periasamy
parent 3b9dbd748b
commit e86dfcf41e
  1. 8
      bucket-notification-datatypes.go
  2. 34
      bucket-notification-handlers.go
  3. 80
      bucket-notification-utils.go
  4. 54
      bucket-notification-utils_test.go
  5. 42
      event-notifier.go
  6. 10
      notifiers.go
  7. 8
      server_test.go

@ -155,15 +155,15 @@ type NotificationEvent struct {
S3 eventMeta `json:"s3"`
}
// Represents the minio lambda type and account id's.
type arnLambda struct {
// Represents the minio topic type and account id's.
type arnTopic struct {
Type string
AccountID string
}
// Stringer for constructing AWS ARN compatible string.
func (m arnLambda) String() string {
return minioLambda + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type
func (m arnTopic) String() string {
return minioTopic + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type
}
// Represents the minio sqs type and account id's.

@ -23,7 +23,6 @@ import (
"io"
"net/http"
"path"
"strings"
"time"
"github.com/gorilla/mux"
@ -203,22 +202,6 @@ func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []Notifi
}
}
// Returns true if the queueARN is for an Minio queue.
func isMinL(lambdaARN arnLambda) bool {
return strings.HasSuffix(lambdaARN.Type, lambdaTypeMinio)
}
// isMinioARNConfigured - verifies if one lambda ARN is valid and is enabled.
func isMinioARNConfigured(lambdaARN string, lambdaConfigs []lambdaConfig) bool {
for _, lambdaConfig := range lambdaConfigs {
// Validate if lambda ARN is already enabled.
if lambdaARN == lambdaConfig.LambdaARN {
return true
}
}
return false
}
// ListenBucketNotificationHandler - list bucket notifications.
func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
// Validate request authorization.
@ -230,8 +213,8 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
bucket := vars["bucket"]
// Get notification ARN.
lambdaARN := r.URL.Query().Get("notificationARN")
if lambdaARN == "" {
topicARN := r.URL.Query().Get("notificationARN")
if topicARN == "" {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
@ -250,8 +233,9 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
return
}
// Notifications set, but do not have MINIO queue enabled, return.
if !isMinioARNConfigured(lambdaARN, notificationCfg.LambdaConfigs) {
// Set SNS notifications only if special "listen" sns is set in bucket
// notification configs.
if !isMinioSNSConfigured(topicARN, notificationCfg.TopicConfigs) {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
@ -264,10 +248,10 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
// Close the listener channel.
defer close(nEventCh)
// Set lambda target.
eventN.SetLambdaTarget(lambdaARN, nEventCh)
// Remove lambda listener after the writer has closed or the client disconnected.
defer eventN.RemoveLambdaTarget(lambdaARN, nEventCh)
// Set sns target.
eventN.SetSNSTarget(topicARN, nEventCh)
// Remove sns listener after the writer has closed or the client disconnected.
defer eventN.RemoveSNSTarget(topicARN, nEventCh)
// Start sending bucket notifications.
sendBucketNotification(w, nEventCh)

@ -111,17 +111,33 @@ func checkQueueARN(queueARN string) APIErrorCode {
return ErrNone
}
// checkLambdaARN - check if the lambda arn is valid.
func checkLambdaARN(lambdaARN string) APIErrorCode {
if !strings.HasPrefix(lambdaARN, minioLambda) {
// checkTopicARN - check if the topic arn is valid.
func checkTopicARN(topicARN string) APIErrorCode {
if !strings.HasPrefix(topicARN, minioTopic) {
return ErrARNNotification
}
if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") {
if !strings.HasPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") {
return ErrRegionNotification
}
return ErrNone
}
// Returns true if the topicARN is for an Minio sns listen type.
func isMinioSNS(topicARN arnTopic) bool {
return strings.HasSuffix(topicARN.Type, snsTypeMinio)
}
// isMinioSNSConfigured - verifies if one topic ARN is valid and is enabled.
func isMinioSNSConfigured(topicARN string, topicConfigs []topicConfig) bool {
for _, topicConfig := range topicConfigs {
// Validate if topic ARN is already enabled.
if topicARN == topicConfig.TopicARN {
return true
}
}
return false
}
// Validate if we recognize the queue type.
func isValidQueue(sqsARN arnSQS) bool {
amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?.
@ -130,9 +146,9 @@ func isValidQueue(sqsARN arnSQS) bool {
return amqpQ || elasticQ || redisQ
}
// Validate if we recognize the lambda type.
func isValidLambda(lambdaARN arnLambda) bool {
return isMinL(lambdaARN) // Is minio lambda?.
// Validate if we recognize the topic type.
func isValidTopic(topicARN arnTopic) bool {
return isMinioSNS(topicARN) // Is minio topic?.
}
// Validates account id for input queue ARN.
@ -188,26 +204,26 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode {
}
// Check - validates queue configuration and returns error if any.
func checkLambdaConfig(lConfig lambdaConfig) APIErrorCode {
func checkTopicConfig(tConfig topicConfig) APIErrorCode {
// Check queue arn is valid.
if s3Error := checkLambdaARN(lConfig.LambdaARN); s3Error != ErrNone {
if s3Error := checkTopicARN(tConfig.TopicARN); s3Error != ErrNone {
return s3Error
}
// Unmarshals QueueARN into structured object.
lambdaARN := unmarshalLambdaARN(lConfig.LambdaARN)
// Validate if lambdaARN requested any of the known supported queues.
if !isValidLambda(lambdaARN) {
topicARN := unmarshalTopicARN(tConfig.TopicARN)
// Validate if topicARN requested any of the known supported queues.
if !isValidTopic(topicARN) {
return ErrARNNotification
}
// Check if valid events are set in queue config.
if s3Error := checkEvents(lConfig.Events); s3Error != ErrNone {
if s3Error := checkEvents(tConfig.Events); s3Error != ErrNone {
return s3Error
}
// Check if valid filters are set in queue config.
if s3Error := checkFilterRules(lConfig.Filter.Key.FilterRules); s3Error != ErrNone {
if s3Error := checkFilterRules(tConfig.Filter.Key.FilterRules); s3Error != ErrNone {
return s3Error
}
@ -228,12 +244,12 @@ func validateQueueConfigs(queueConfigs []queueConfig) APIErrorCode {
return ErrNone
}
// Validates all incoming lambda configs, checkLambdaConfig validates if the
// Validates all incoming topic configs, checkTopicConfig validates if the
// input fields for each queues is not malformed and has valid configuration
// information. If validation fails bucket notifications are not enabled.
func validateLambdaConfigs(lambdaConfigs []lambdaConfig) APIErrorCode {
for _, lConfig := range lambdaConfigs {
if s3Error := checkLambdaConfig(lConfig); s3Error != ErrNone {
func validateTopicConfigs(topicConfigs []topicConfig) APIErrorCode {
for _, tConfig := range topicConfigs {
if s3Error := checkTopicConfig(tConfig); s3Error != ErrNone {
return s3Error
}
}
@ -248,7 +264,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone {
return s3Error
}
if s3Error := validateLambdaConfigs(nConfig.LambdaConfigs); s3Error != ErrNone {
if s3Error := validateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone {
return s3Error
}
@ -256,21 +272,21 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
return ErrNone
}
// Unmarshals input value of AWS ARN format into minioLambda object.
// Returned value represents minio lambda type, currently supported are
// - minio
func unmarshalLambdaARN(lambdaARN string) arnLambda {
lambda := arnLambda{}
if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") {
return lambda
// Unmarshals input value of AWS ARN format into minioTopic object.
// Returned value represents minio topic type, currently supported are
// - listen
func unmarshalTopicARN(topicARN string) arnTopic {
topic := arnTopic{}
if !strings.HasPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":") {
return topic
}
lambdaType := strings.TrimPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":")
topicType := strings.TrimPrefix(topicARN, minioTopic+serverConfig.GetRegion()+":")
switch {
case strings.HasSuffix(lambdaType, lambdaTypeMinio):
lambda.Type = lambdaTypeMinio
} // Add more lambda here.
lambda.AccountID = strings.TrimSuffix(lambdaType, ":"+lambda.Type)
return lambda
case strings.HasSuffix(topicType, snsTypeMinio):
topic.Type = snsTypeMinio
} // Add more topic here.
topic.AccountID = strings.TrimSuffix(topicType, ":"+topic.Type)
return topic
}
// Unmarshals input value of AWS ARN format into minioSqs object.

@ -97,8 +97,8 @@ func TestValidEvents(t *testing.T) {
}
}
// Tests lambda arn validation.
func TestLambdaARN(t *testing.T) {
// Tests topic arn validation.
func TestTopicARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
@ -106,33 +106,33 @@ func TestLambdaARN(t *testing.T) {
defer removeAll(rootPath)
testCases := []struct {
lambdaARN string
topicARN string
errCode APIErrorCode
}{
// Valid minio lambda with '1' account id.
// Valid minio topic with '1' account id.
{
lambdaARN: "arn:minio:lambda:us-east-1:1:minio",
topicARN: "arn:minio:sns:us-east-1:1:minio",
errCode: ErrNone,
},
// Valid minio lambda with '10' account id.
// Valid minio topic with '10' account id.
{
lambdaARN: "arn:minio:lambda:us-east-1:10:minio",
topicARN: "arn:minio:sns:us-east-1:10:minio",
errCode: ErrNone,
},
// Invalid empty queue arn.
{
lambdaARN: "",
topicARN: "",
errCode: ErrARNNotification,
},
// Invalid region 'us-west-1' in queue arn.
{
lambdaARN: "arn:minio:lambda:us-west-1:1:redis",
topicARN: "arn:minio:sns:us-west-1:1:redis",
errCode: ErrRegionNotification,
},
}
for i, testCase := range testCases {
errCode := checkLambdaARN(testCase.lambdaARN)
errCode := checkTopicARN(testCase.topicARN)
if testCase.errCode != errCode {
t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode)
}
@ -186,8 +186,8 @@ func TestQueueARN(t *testing.T) {
}
}
// Test unmarshal queue arn.
func TestUnmarshalLambdaARN(t *testing.T) {
// Test unmarshal topic arn.
func TestUnmarshalTopicARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
@ -195,40 +195,40 @@ func TestUnmarshalLambdaARN(t *testing.T) {
defer removeAll(rootPath)
testCases := []struct {
lambdaARN string
topicARN string
Type string
}{
// Valid minio lambda arn.
// Valid minio topic arn.
{
lambdaARN: "arn:minio:lambda:us-east-1:1:lambda",
Type: "lambda",
topicARN: "arn:minio:sns:us-east-1:1:listen",
Type: "listen",
},
// Invalid empty queue arn.
// Invalid empty topic arn.
{
lambdaARN: "",
topicARN: "",
Type: "",
},
// Invalid region 'us-west-1' in queue arn.
// Invalid region 'us-west-1' in topic arn.
{
lambdaARN: "arn:minio:lambda:us-west-1:1:lambda",
topicARN: "arn:minio:sns:us-west-1:1:listen",
Type: "",
},
// Partial queue arn.
// Partial topic arn.
{
lambdaARN: "arn:minio:lambda:",
topicARN: "arn:minio:sns:",
Type: "",
},
// Invalid queue service value.
// Invalid topic service value.
{
lambdaARN: "arn:minio:lambda:us-east-1:1:*",
topicARN: "arn:minio:sns:us-east-1:1:*",
Type: "",
},
}
for i, testCase := range testCases {
lambda := unmarshalLambdaARN(testCase.lambdaARN)
if testCase.Type != lambda.Type {
t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, lambda.Type)
topic := unmarshalTopicARN(testCase.topicARN)
if testCase.Type != topic.Type {
t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, topic.Type)
}
}
}

@ -35,7 +35,7 @@ type eventNotifier struct {
// Collection of 'bucket' and notification config.
notificationConfigs map[string]*notificationConfig
lambdaTargets map[string][]chan []NotificationEvent
snsTargets map[string][]chan []NotificationEvent
queueTargets map[string]*logrus.Logger
}
@ -101,37 +101,37 @@ func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger {
return en.queueTargets[queueARN]
}
func (en eventNotifier) GetLambdaTarget(lambdaARN string) []chan []NotificationEvent {
func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
return en.lambdaTargets[lambdaARN]
return en.snsTargets[snsARN]
}
// Set a new lambda target for an input lambda ARN.
func (en *eventNotifier) SetLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) error {
// Set a new sns target for an input sns ARN.
func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if listenerCh == nil {
return errors.New("invalid argument")
}
en.lambdaTargets[lambdaARN] = append(en.lambdaTargets[lambdaARN], listenerCh)
en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh)
return nil
}
// Remove lambda target for an input lambda ARN.
func (en *eventNotifier) RemoveLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) {
// Remove sns target for an input sns ARN.
func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
lambdaTarget, ok := en.lambdaTargets[lambdaARN]
snsTarget, ok := en.snsTargets[snsARN]
if ok {
for i, savedListenerCh := range lambdaTarget {
for i, savedListenerCh := range snsTarget {
if listenerCh == savedListenerCh {
lambdaTarget = append(lambdaTarget[:i], lambdaTarget[i+1:]...)
if len(lambdaTarget) == 0 {
delete(en.lambdaTargets, lambdaARN)
snsTarget = append(snsTarget[:i], snsTarget[i+1:]...)
if len(snsTarget) == 0 {
delete(en.snsTargets, snsARN)
break
}
en.lambdaTargets[lambdaARN] = lambdaTarget
en.snsTargets[snsARN] = snsTarget
}
}
}
@ -180,7 +180,7 @@ func eventNotify(event eventData) {
nConfig := eventN.GetBucketNotificationConfig(event.Bucket)
// No bucket notifications enabled, drop the event notification.
if len(nConfig.QueueConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
return
}
@ -206,12 +206,12 @@ func eventNotify(event eventData) {
}
}
}
// Validate if the event and object match the lambda configs.
for _, lambdaConfig := range nConfig.LambdaConfigs {
ruleMatch := filterRuleMatch(objectName, lambdaConfig.Filter.Key.FilterRules)
eventMatch := eventMatch(eventType, lambdaConfig.Events)
// Validate if the event and object match the sns configs.
for _, topicConfig := range nConfig.TopicConfigs {
ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules)
eventMatch := eventMatch(eventType, topicConfig.Events)
if eventMatch && ruleMatch {
targetListeners := eventN.GetLambdaTarget(lambdaConfig.LambdaARN)
targetListeners := eventN.GetSNSTarget(topicConfig.TopicARN)
for _, listener := range targetListeners {
listener <- notificationEvent
}
@ -377,7 +377,7 @@ func initEventNotifier(objAPI ObjectLayer) error {
rwMutex: &sync.RWMutex{},
notificationConfigs: configs,
queueTargets: queueTargets,
lambdaTargets: make(map[string][]chan []NotificationEvent),
snsTargets: make(map[string][]chan []NotificationEvent),
}
return nil

@ -36,13 +36,13 @@ const (
queueTypeRedis = "redis"
)
// Lambda type.
// Topic type.
const (
// Minio lambda ARN prefix.
minioLambda = "arn:minio:lambda:"
// Minio topic ARN prefix.
minioTopic = "arn:minio:sns:"
// Static string indicating lambda type 'lambda'.
lambdaTypeMinio = "lambda"
// Static string indicating sns type 'listen'.
snsTypeMinio = "listen"
)
var errNotifyNotEnabled = errors.New("requested notifier not enabled")

@ -79,7 +79,7 @@ func (s *TestSuiteCommon) TestAuth(c *C) {
// verifies it by fetching the notification back.
func (s *TestSuiteCommon) TestBucketNotification(c *C) {
// Sample bucket notification
bucketNotificationBuf := `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-east-1:444455556666:lambda</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
bucketNotificationBuf := `<NotificationConfiguration><TopicConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Topic>arn:minio:sns:us-east-1:444455556666:listen</Topic></TopicConfiguration></NotificationConfiguration>`
// generate a random bucket Name.
bucketName := getRandomBucketName()
@ -121,7 +121,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) {
// Verify if downloaded policy matches with previousy uploaded.
c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true)
invalidBucketNotificationBuf := `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-east-1:444455556666:minio</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
invalidBucketNotificationBuf := `<NotificationConfiguration><TopicConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Topic>arn:minio:sns:us-east-1:444455556666:minio</Topic></TopicConfiguration></NotificationConfiguration>`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
@ -134,7 +134,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) {
verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
invalidBucketNotificationBuf = `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-west-1:444455556666:lambda</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
invalidBucketNotificationBuf = `<NotificationConfiguration><TopicConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Topic>arn:minio:sns:us-west-1:444455556666:listen</Topic></TopicConfiguration></NotificationConfiguration>`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
@ -146,7 +146,7 @@ func (s *TestSuiteCommon) TestBucketNotification(c *C) {
verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest)
invalidBucketNotificationBuf = `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Invalid</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-east-1:444455556666:lambda</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
invalidBucketNotificationBuf = `<NotificationConfiguration><TopicConfiguration><Event>s3:ObjectCreated:Invalid</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Topic>arn:minio:sns:us-east-1:444455556666:listen</Topic></TopicConfiguration></NotificationConfiguration>`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)

Loading…
Cancel
Save