diff --git a/api-errors.go b/api-errors.go index 10f596b86..71289be10 100644 --- a/api-errors.go +++ b/api-errors.go @@ -110,6 +110,10 @@ const ( ErrARNNotification ErrRegionNotification ErrOverlappingFilterNotification + ErrFilterNameInvalid + ErrFilterNamePrefix + ErrFilterNameSuffix + ErrFilterPrefixValueInvalid // S3 extended errors. ErrContentSHA256Mismatch @@ -438,6 +442,26 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: "An object key name filtering rule defined with overlapping prefixes, overlapping suffixes, or overlapping combinations of prefixes and suffixes for the same event types.", HTTPStatusCode: http.StatusBadRequest, }, + ErrFilterNameInvalid: { + Code: "InvalidArgument", + Description: "filter rule name must be either prefix or suffix", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrFilterNamePrefix: { + Code: "InvalidArgument", + Description: "Cannot specify more than one prefix rule in a filter.", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrFilterNameSuffix: { + Code: "InvalidArgument", + Description: "Cannot specify more than one suffix rule in a filter.", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrFilterPrefixValueInvalid: { + Code: "InvalidArgument", + Description: "prefix rule value cannot exceed 1024 characters", + HTTPStatusCode: http.StatusBadRequest, + }, /// S3 extensions. ErrContentSHA256Mismatch: { diff --git a/bucket-notification-datatypes.go b/bucket-notification-datatypes.go index 36cc0f06a..60efd2522 100644 --- a/bucket-notification-datatypes.go +++ b/bucket-notification-datatypes.go @@ -18,41 +18,43 @@ package main import "encoding/xml" +// Represents the criteria for the filter rule. type filterRule struct { - Name string `xml:"FilterRuleName"` - Value string + Name string `xml:"Name"` + Value string `xml:"Value"` } +// Collection of filter rules per service config. type keyFilter struct { - FilterRules []filterRule `xml:"FilterRule"` -} - -type notificationConfigFilter struct { - Key keyFilter `xml:"S3Key"` + FilterRules []filterRule `xml:"FilterRule,omitempty"` } // Queue SQS configuration. type queueConfig struct { - Events []string `xml:"Event"` - Filter notificationConfigFilter + Events []string `xml:"Event"` + Filter struct { + Key keyFilter `xml:"S3Key,omitempty"` + } ID string `xml:"Id"` QueueArn string `xml:"Queue"` } -// Topic SNS configuration, this is a compliance field -// not used by minio yet. +// Topic SNS configuration, this is a compliance field not used by minio yet. type topicConfig struct { - Events []string `xml:"Event"` - Filter notificationConfigFilter + Events []string `xml:"Event"` + Filter struct { + Key keyFilter `xml:"S3Key"` + } ID string `xml:"Id"` TopicArn string `xml:"Topic"` } -// Lambda function configuration, this is a compliance field -// not used by minio yet. +// Lambda function configuration, this is a compliance field not used by minio yet. type lambdaFuncConfig struct { - Events []string `xml:"Event"` - Filter notificationConfigFilter + Events []string `xml:"Event"` + Filter struct { + Key keyFilter `xml:"S3Key"` + } ID string `xml:"Id"` LambdaFunctionArn string `xml:"CloudFunction"` } @@ -110,13 +112,15 @@ func defaultIdentity() identity { return identity{"minio"} } -type s3BucketReference struct { +// Notification event bucket metadata. +type bucketMeta struct { Name string `json:"name"` OwnerIdentity identity `json:"ownerIdentity"` ARN string `json:"arn"` } -type s3ObjectReference struct { +// Notification event object metadata. +type objectMeta struct { Key string `json:"key"` Size int64 `json:"size,omitempty"` ETag string `json:"eTag,omitempty"` @@ -124,11 +128,12 @@ type s3ObjectReference struct { Sequencer string `json:"sequencer"` } -type s3Reference struct { - SchemaVersion string `json:"s3SchemaVersion"` - ConfigurationID string `json:"configurationId"` - Bucket s3BucketReference `json:"bucket"` - Object s3ObjectReference `json:"object"` +// Notification event server specific metadata. +type eventMeta struct { + SchemaVersion string `json:"s3SchemaVersion"` + ConfigurationID string `json:"configurationId"` + Bucket bucketMeta `json:"bucket"` + Object objectMeta `json:"object"` } // NotificationEvent represents an Amazon an S3 bucket notification event. @@ -141,7 +146,7 @@ type NotificationEvent struct { UserIdentity identity `json:"userIdentity"` RequestParameters map[string]string `json:"requestParameters"` ResponseElements map[string]string `json:"responseElements"` - S3 s3Reference `json:"s3"` + S3 eventMeta `json:"s3"` } // Represents the minio sqs type and inputs. diff --git a/bucket-notification-utils.go b/bucket-notification-utils.go index e57597ebf..5ae1738d6 100644 --- a/bucket-notification-utils.go +++ b/bucket-notification-utils.go @@ -51,6 +51,55 @@ func checkEvents(events []string) APIErrorCode { return ErrNone } +// Valid if filterName is 'prefix'. +func isValidFilterNamePrefix(filterName string) bool { + return "prefix" == filterName +} + +// Valid if filterName is 'suffix'. +func isValidFilterNameSuffix(filterName string) bool { + return "suffix" == filterName +} + +// Is this a valid filterName? - returns true if valid. +func isValidFilterName(filterName string) bool { + return isValidFilterNamePrefix(filterName) || isValidFilterNameSuffix(filterName) +} + +// checkFilterRules - checks given list of filter rules if all of them are valid. +func checkFilterRules(filterRules []filterRule) APIErrorCode { + ruleSetMap := make(map[string]string) + // Validate all filter rules. + for _, filterRule := range filterRules { + // Unknown filter rule name found, returns an appropriate error. + if !isValidFilterName(filterRule.Name) { + return ErrFilterNameInvalid + } + + // Filter names should not be set twice per notification service + // configuration, if found return an appropriate error. + if _, ok := ruleSetMap[filterRule.Name]; ok { + if isValidFilterNamePrefix(filterRule.Name) { + return ErrFilterNamePrefix + } else if isValidFilterNameSuffix(filterRule.Name) { + return ErrFilterNameSuffix + } else { + return ErrFilterNameInvalid + } + } + + // Maximum prefix length can be up to 1,024 characters, validate. + if !IsValidObjectPrefix(filterRule.Value) { + return ErrFilterPrefixValueInvalid + } + + // Set the new rule name to keep track of duplicates. + ruleSetMap[filterRule.Name] = filterRule.Value + } + // Success all prefixes validated. + return ErrNone +} + // checkQueueArn - check if the queue arn is valid. func checkQueueArn(queueArn string) APIErrorCode { if !strings.HasPrefix(queueArn, minioSqs) { @@ -62,6 +111,14 @@ func checkQueueArn(queueArn string) APIErrorCode { return ErrNone } +// Validate if we recognize the queue type. +func isValidQueue(sqsArn arnMinioSqs) bool { + amqpQ := isAMQPQueue(sqsArn) // Is amqp queue?. + elasticQ := isElasticQueue(sqsArn) // Is elastic queue?. + redisQ := isRedisQueue(sqsArn) // Is redis queue?. + return amqpQ || elasticQ || redisQ +} + // Check - validates queue configuration and returns error if any. func checkQueueConfig(qConfig queueConfig) APIErrorCode { // Check queue arn is valid. @@ -72,7 +129,7 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode { // Unmarshals QueueArn into structured object. sqsArn := unmarshalSqsArn(qConfig.QueueArn) // Validate if sqsArn requested any of the known supported queues. - if !isAMQPQueue(sqsArn) || !isElasticQueue(sqsArn) || !isRedisQueue(sqsArn) { + if !isValidQueue(sqsArn) { return ErrARNNotification } @@ -81,6 +138,11 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode { return s3Error } + // Check if valid filters are set in queue config. + if s3Error := checkFilterRules(qConfig.Filter.Key.FilterRules); s3Error != ErrNone { + return s3Error + } + // Success. return ErrNone } @@ -113,6 +175,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { // Returned value represents minio sqs types, currently supported are // - amqp // - elasticsearch +// - redis func unmarshalSqsArn(queueArn string) (mSqs arnMinioSqs) { sqsType := strings.TrimPrefix(queueArn, minioSqs+serverConfig.GetRegion()+":") mSqs = arnMinioSqs{} diff --git a/bucket-notification-utils_test.go b/bucket-notification-utils_test.go new file mode 100644 index 000000000..4e7aad742 --- /dev/null +++ b/bucket-notification-utils_test.go @@ -0,0 +1,17 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main diff --git a/logger-amqp.go b/logger-amqp.go index 3de0d2ac3..25bacde7e 100644 --- a/logger-amqp.go +++ b/logger-amqp.go @@ -17,8 +17,6 @@ package main import ( - "errors" - "github.com/Sirupsen/logrus" "github.com/streadway/amqp" ) @@ -46,6 +44,9 @@ type amqpConn struct { } func dialAMQP(amqpL amqpLogger) (amqpConn, error) { + if !amqpL.Enable { + return amqpConn{}, errLoggerNotEnabled + } conn, err := amqp.Dial(amqpL.URL) if err != nil { return amqpConn{}, err @@ -53,13 +54,8 @@ func dialAMQP(amqpL amqpLogger) (amqpConn, error) { return amqpConn{Connection: conn, params: amqpL}, nil } -var errLoggerNotEnabled = errors.New("logger type not enabled") - func enableAMQPLogger() error { amqpL := serverConfig.GetAMQPLogger() - if !amqpL.Enable { - return errLoggerNotEnabled - } // Connect to amqp server. amqpC, err := dialAMQP(amqpL) diff --git a/logger-elasticsearch.go b/logger-elasticsearch.go index f077227fe..6059f865f 100644 --- a/logger-elasticsearch.go +++ b/logger-elasticsearch.go @@ -37,8 +37,11 @@ type elasticClient struct { } // Connects to elastic search instance at URL. -func dialElastic(url string) (*elastic.Client, error) { - client, err := elastic.NewClient(elastic.SetURL(url), elastic.SetSniff(false)) +func dialElastic(esLogger elasticSearchLogger) (*elastic.Client, error) { + if !esLogger.Enable { + return nil, errLoggerNotEnabled + } + client, err := elastic.NewClient(elastic.SetURL(esLogger.URL), elastic.SetSniff(false)) if err != nil { return nil, err } @@ -48,10 +51,9 @@ func dialElastic(url string) (*elastic.Client, error) { // Enables elasticsearch logger. func enableElasticLogger() error { esLogger := serverConfig.GetElasticSearchLogger() - if !esLogger.Enable { - return errLoggerNotEnabled - } - client, err := dialElastic(esLogger.URL) + + // Dial to elastic search. + client, err := dialElastic(esLogger) if err != nil { return err } diff --git a/logger-redis.go b/logger-redis.go index ed019e334..71114b46e 100644 --- a/logger-redis.go +++ b/logger-redis.go @@ -38,7 +38,13 @@ type redisConn struct { } // Dial a new connection to redis instance at addr, optionally with a password if any. -func dialRedis(addr, password string) (*redis.Pool, error) { +func dialRedis(rLogger redisLogger) (*redis.Pool, error) { + // Return error if redis not enabled. + if !rLogger.Enable { + return nil, errLoggerNotEnabled + } + addr := rLogger.Addr + password := rLogger.Password rPool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 240 * time.Second, @@ -77,12 +83,9 @@ func dialRedis(addr, password string) (*redis.Pool, error) { func enableRedisLogger() error { rLogger := serverConfig.GetRedisLogger() - if !rLogger.Enable { - return errLoggerNotEnabled - } // Dial redis. - rPool, err := dialRedis(rLogger.Addr, rLogger.Password) + rPool, err := dialRedis(rLogger) if err != nil { return err } diff --git a/logger.go b/logger.go index 9b38e6a11..a9dea0f30 100644 --- a/logger.go +++ b/logger.go @@ -19,6 +19,7 @@ package main import ( "bufio" "bytes" + "errors" "os" "runtime" "runtime/debug" @@ -52,6 +53,8 @@ type logger struct { // Add new loggers here. } +var errLoggerNotEnabled = errors.New("requested logger type is not enabled") + // sysInfo returns useful system statistics. func sysInfo() map[string]string { host, err := os.Hostname() diff --git a/main.go b/main.go index f60445516..2a58157ad 100644 --- a/main.go +++ b/main.go @@ -80,6 +80,7 @@ func enableLoggers() { // Adding new bucket notification related loggers. enableAMQPLogger() enableElasticLogger() + enableRedisLogger() // Add your logger here. } diff --git a/queues.go b/queues.go index ba63b2875..0aada9a24 100644 --- a/queues.go +++ b/queues.go @@ -19,6 +19,7 @@ package main import ( "fmt" "net/url" + "strings" "time" "github.com/Sirupsen/logrus" @@ -36,54 +37,48 @@ const ( // Returns true if queueArn is for an AMQP queue. func isAMQPQueue(sqsArn arnMinioSqs) bool { - if sqsArn.sqsType == queueTypeAMQP { - amqpL := serverConfig.GetAMQPLogger() - if !amqpL.Enable { - return false - } - // Connect to amqp server to validate. - amqpC, err := dialAMQP(amqpL) - if err != nil { - errorIf(err, "Unable to connect to amqp service.", amqpL) - return false - } - defer amqpC.Close() + if sqsArn.sqsType != queueTypeAMQP { + return false } + amqpL := serverConfig.GetAMQPLogger() + // Connect to amqp server to validate. + amqpC, err := dialAMQP(amqpL) + if err != nil { + errorIf(err, "Unable to connect to amqp service. %#v", amqpL) + return false + } + defer amqpC.Close() return true } // Returns true if queueArn is for an Redis queue. func isRedisQueue(sqsArn arnMinioSqs) bool { - if sqsArn.sqsType == queueTypeRedis { - rLogger := serverConfig.GetRedisLogger() - if !rLogger.Enable { - return false - } - // Connect to redis server to validate. - rPool, err := dialRedis(rLogger.Addr, rLogger.Password) - if err != nil { - errorIf(err, "Unable to connect to redis service.", rLogger) - return false - } - defer rPool.Close() + if sqsArn.sqsType != queueTypeRedis { + return false + } + rLogger := serverConfig.GetRedisLogger() + // Connect to redis server to validate. + rPool, err := dialRedis(rLogger) + if err != nil { + errorIf(err, "Unable to connect to redis service. %#v", rLogger) + return false } + defer rPool.Close() return true } // Returns true if queueArn is for an ElasticSearch queue. func isElasticQueue(sqsArn arnMinioSqs) bool { - if sqsArn.sqsType == queueTypeElastic { - esLogger := serverConfig.GetElasticSearchLogger() - if !esLogger.Enable { - return false - } - elasticC, err := dialElastic(esLogger.URL) - if err != nil { - errorIf(err, "Unable to connect to elasticsearch service.", esLogger.URL) - return false - } - defer elasticC.Stop() + if sqsArn.sqsType != queueTypeElastic { + return false + } + esLogger := serverConfig.GetElasticSearchLogger() + elasticC, err := dialElastic(esLogger) + if err != nil { + errorIf(err, "Unable to connect to elasticsearch service %#v", esLogger) + return false } + defer elasticC.Stop() return true } @@ -98,6 +93,19 @@ func eventMatch(eventType EventName, events []string) (ok bool) { return ok } +// Filter rule match, matches an object against the filter rules. +func filterRuleMatch(object string, frs []filterRule) bool { + var prefixMatch, suffixMatch = true, true + for _, fr := range frs { + if isValidFilterNamePrefix(fr.Name) { + prefixMatch = strings.HasPrefix(object, fr.Value) + } else if isValidFilterNameSuffix(fr.Name) { + suffixMatch = strings.HasSuffix(object, fr.Value) + } + } + return prefixMatch && suffixMatch +} + // NotifyObjectCreatedEvent - notifies a new 's3:ObjectCreated' event. // List of events reported through this function are // - s3:ObjectCreated:Put @@ -121,15 +129,15 @@ func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, b UserIdentity: defaultIdentity(), RequestParameters: map[string]string{}, ResponseElements: map[string]string{}, - S3: s3Reference{ + S3: eventMeta{ SchemaVersion: "1.0", ConfigurationID: "Config", - Bucket: s3BucketReference{ + Bucket: bucketMeta{ Name: bucket, OwnerIdentity: defaultIdentity(), ARN: "arn:aws:s3:::" + bucket, }, - Object: s3ObjectReference{ + Object: objectMeta{ Key: url.QueryEscape(object), ETag: etag, Size: size, @@ -140,7 +148,8 @@ func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, b } // Notify to all the configured queues. for _, qConfig := range nConfig.QueueConfigurations { - if eventMatch(eventType, qConfig.Events) { + ruleMatch := filterRuleMatch(object, qConfig.Filter.Key.FilterRules) + if eventMatch(eventType, qConfig.Events) && ruleMatch { log.WithFields(logrus.Fields{ "Records": events, }).Info() @@ -167,15 +176,15 @@ func notifyObjectDeletedEvent(nConfig notificationConfig, bucket string, object UserIdentity: defaultIdentity(), RequestParameters: map[string]string{}, ResponseElements: map[string]string{}, - S3: s3Reference{ + S3: eventMeta{ SchemaVersion: "1.0", ConfigurationID: "Config", - Bucket: s3BucketReference{ + Bucket: bucketMeta{ Name: bucket, OwnerIdentity: defaultIdentity(), ARN: "arn:aws:s3:::" + bucket, }, - Object: s3ObjectReference{ + Object: objectMeta{ Key: url.QueryEscape(object), Sequencer: sequencer, }, @@ -184,7 +193,8 @@ func notifyObjectDeletedEvent(nConfig notificationConfig, bucket string, object } // Notify to all the configured queues. for _, qConfig := range nConfig.QueueConfigurations { - if eventMatch(ObjectRemovedDelete, qConfig.Events) { + ruleMatch := filterRuleMatch(object, qConfig.Filter.Key.FilterRules) + if eventMatch(ObjectRemovedDelete, qConfig.Events) && ruleMatch { log.WithFields(logrus.Fields{ "Records": events, }).Info()