@ -19,6 +19,7 @@ package main
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/Sirupsen/logrus"
@ -36,54 +37,48 @@ const (
// Returns true if queueArn is for an AMQP queue.
func isAMQPQueue ( sqsArn arnMinioSqs ) bool {
if sqsArn . sqsType == queueTypeAMQP {
amqpL := serverConfig . GetAMQPLogger ( )
if ! amqpL . Enable {
return false
}
// Connect to amqp server to validate.
amqpC , err := dialAMQP ( amqpL )
if err != nil {
errorIf ( err , "Unable to connect to amqp service." , amqpL )
return false
}
defer amqpC . Close ( )
if sqsArn . sqsType != queueTypeAMQP {
return false
}
amqpL := serverConfig . GetAMQPLogger ( )
// Connect to amqp server to validate.
amqpC , err := dialAMQP ( amqpL )
if err != nil {
errorIf ( err , "Unable to connect to amqp service. %#v" , amqpL )
return false
}
defer amqpC . Close ( )
return true
}
// Returns true if queueArn is for an Redis queue.
func isRedisQueue ( sqsArn arnMinioSqs ) bool {
if sqsArn . sqsType == queueTypeRedis {
rLogger := serverConfig . GetRedisLogger ( )
if ! rLogger . Enable {
return false
}
// Connect to redis server to validate.
rPool , err := dialRedis ( rLogger . Addr , rLogger . Password )
if err != nil {
errorIf ( err , "Unable to connect to redis service." , rLogger )
return false
}
defer rPool . Close ( )
if sqsArn . sqsType != queueTypeRedis {
return false
}
rLogger := serverConfig . GetRedisLogger ( )
// Connect to redis server to validate.
rPool , err := dialRedis ( rLogger )
if err != nil {
errorIf ( err , "Unable to connect to redis service. %#v" , rLogger )
return false
}
defer rPool . Close ( )
return true
}
// Returns true if queueArn is for an ElasticSearch queue.
func isElasticQueue ( sqsArn arnMinioSqs ) bool {
if sqsArn . sqsType == queueTypeElastic {
esLogger := serverConfig . GetElasticSearchLogger ( )
if ! esLogger . Enable {
return false
}
elasticC , err := dialElastic ( esLogger . URL )
if err != nil {
errorIf ( err , "Unable to connect to elasticsearch service." , esLogger . URL )
return false
}
defer elasticC . Stop ( )
if sqsArn . sqsType != queueTypeElastic {
return false
}
esLogger := serverConfig . GetElasticSearchLogger ( )
elasticC , err := dialElastic ( esLogger )
if err != nil {
errorIf ( err , "Unable to connect to elasticsearch service %#v" , esLogger )
return false
}
defer elasticC . Stop ( )
return true
}
@ -98,6 +93,19 @@ func eventMatch(eventType EventName, events []string) (ok bool) {
return ok
}
// Filter rule match, matches an object against the filter rules.
func filterRuleMatch ( object string , frs [ ] filterRule ) bool {
var prefixMatch , suffixMatch = true , true
for _ , fr := range frs {
if isValidFilterNamePrefix ( fr . Name ) {
prefixMatch = strings . HasPrefix ( object , fr . Value )
} else if isValidFilterNameSuffix ( fr . Name ) {
suffixMatch = strings . HasSuffix ( object , fr . Value )
}
}
return prefixMatch && suffixMatch
}
// NotifyObjectCreatedEvent - notifies a new 's3:ObjectCreated' event.
// List of events reported through this function are
// - s3:ObjectCreated:Put
@ -121,15 +129,15 @@ func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, b
UserIdentity : defaultIdentity ( ) ,
RequestParameters : map [ string ] string { } ,
ResponseElements : map [ string ] string { } ,
S3 : s3Reference {
S3 : eventMeta {
SchemaVersion : "1.0" ,
ConfigurationID : "Config" ,
Bucket : s3BucketReference {
Bucket : bucketMeta {
Name : bucket ,
OwnerIdentity : defaultIdentity ( ) ,
ARN : "arn:aws:s3:::" + bucket ,
} ,
Object : s3ObjectReference {
Object : objectMeta {
Key : url . QueryEscape ( object ) ,
ETag : etag ,
Size : size ,
@ -140,7 +148,8 @@ func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, b
}
// Notify to all the configured queues.
for _ , qConfig := range nConfig . QueueConfigurations {
if eventMatch ( eventType , qConfig . Events ) {
ruleMatch := filterRuleMatch ( object , qConfig . Filter . Key . FilterRules )
if eventMatch ( eventType , qConfig . Events ) && ruleMatch {
log . WithFields ( logrus . Fields {
"Records" : events ,
} ) . Info ( )
@ -167,15 +176,15 @@ func notifyObjectDeletedEvent(nConfig notificationConfig, bucket string, object
UserIdentity : defaultIdentity ( ) ,
RequestParameters : map [ string ] string { } ,
ResponseElements : map [ string ] string { } ,
S3 : s3Reference {
S3 : eventMeta {
SchemaVersion : "1.0" ,
ConfigurationID : "Config" ,
Bucket : s3BucketReference {
Bucket : bucketMeta {
Name : bucket ,
OwnerIdentity : defaultIdentity ( ) ,
ARN : "arn:aws:s3:::" + bucket ,
} ,
Object : s3ObjectReference {
Object : objectMeta {
Key : url . QueryEscape ( object ) ,
Sequencer : sequencer ,
} ,
@ -184,7 +193,8 @@ func notifyObjectDeletedEvent(nConfig notificationConfig, bucket string, object
}
// Notify to all the configured queues.
for _ , qConfig := range nConfig . QueueConfigurations {
if eventMatch ( ObjectRemovedDelete , qConfig . Events ) {
ruleMatch := filterRuleMatch ( object , qConfig . Filter . Key . FilterRules )
if eventMatch ( ObjectRemovedDelete , qConfig . Events ) && ruleMatch {
log . WithFields ( logrus . Fields {
"Records" : events ,
} ) . Info ( )