api: Notify events only if bucket notifications are set. (#2293)

While the existing code worked, it went to an entire cycle
of constructing event structure and end up not sending it.

Avoid this in the first place, but returning quickly if
notifications are not set on the bucket.
master
Harshavardhana 8 years ago committed by Anand Babu (AB) Periasamy
parent 3054b74260
commit 77248bd6e8
  1. 18
      bucket-handlers.go
  2. 8
      bucket-notification-datatypes.go
  3. 4
      bucket-notification-handlers.go
  4. 42
      object-handlers.go
  5. 10
      queues.go

@ -25,6 +25,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"path"
"strings" "strings"
mux "github.com/gorilla/mux" mux "github.com/gorilla/mux"
@ -391,14 +392,25 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
// Load notification config if any. // Load notification config if any.
nConfig, err := api.loadNotificationConfig(bucket) nConfig, err := api.loadNotificationConfig(bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil { if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return return
} }
size := int64(0) // FIXME: support notify size. // Fetch object info for notifications.
// Notify event. objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
notifyObjectCreatedEvent(nConfig, ObjectCreatedPost, bucket, object, md5Sum, size) if err != nil {
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
return
}
// Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedPost, bucket, objInfo)
} }
// HeadBucketHandler - HEAD Bucket // HeadBucketHandler - HEAD Bucket

@ -16,7 +16,10 @@
package main package main
import "encoding/xml" import (
"encoding/xml"
"errors"
)
// Represents the criteria for the filter rule. // Represents the criteria for the filter rule.
type filterRule struct { type filterRule struct {
@ -68,6 +71,9 @@ type notificationConfig struct {
LambdaConfigurations []lambdaFuncConfig `xml:"CloudFunctionConfiguration"` LambdaConfigurations []lambdaFuncConfig `xml:"CloudFunctionConfiguration"`
} }
// Internal error used to signal notifications not set.
var errNoSuchNotifications = errors.New("The specified bucket does not have bucket notifications")
// EventName is AWS S3 event type: // EventName is AWS S3 event type:
// http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html // http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
type EventName int type EventName int

@ -39,7 +39,7 @@ func (api objectAPIHandlers) loadNotificationConfig(bucket string) (nConfig noti
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case ObjectNotFound: case ObjectNotFound:
return notificationConfig{}, nil return notificationConfig{}, errNoSuchNotifications
} }
return notificationConfig{}, err return notificationConfig{}, err
} }
@ -48,7 +48,7 @@ func (api objectAPIHandlers) loadNotificationConfig(bucket string) (nConfig noti
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case ObjectNotFound: case ObjectNotFound:
return notificationConfig{}, nil return notificationConfig{}, errNoSuchNotifications
} }
return notificationConfig{}, err return notificationConfig{}, err
} }

@ -23,6 +23,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"path"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -357,13 +358,18 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Load notification config if any. // Load notification config if any.
nConfig, err := api.loadNotificationConfig(bucket) nConfig, err := api.loadNotificationConfig(bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil { if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return return
} }
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedCopy, bucket, object, objInfo.MD5Sum, objInfo.Size) notifyObjectCreatedEvent(nConfig, ObjectCreatedCopy, bucket, objInfo)
} }
// PutObjectHandler - PUT Object // PutObjectHandler - PUT Object
@ -436,13 +442,25 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
// Load notification config if any. // Load notification config if any.
nConfig, err := api.loadNotificationConfig(bucket) nConfig, err := api.loadNotificationConfig(bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors return.
if err != nil { if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return return
} }
// Fetch object info for notifications.
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil {
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
return
}
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedPut, bucket, object, md5Sum, size) notifyObjectCreatedEvent(nConfig, ObjectCreatedPut, bucket, objInfo)
} }
/// Multipart objectAPIHandlers /// Multipart objectAPIHandlers
@ -761,14 +779,25 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
// Load notification config if any. // Load notification config if any.
nConfig, err := api.loadNotificationConfig(bucket) nConfig, err := api.loadNotificationConfig(bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors.
if err != nil { if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return return
} }
// Fetch object info for notifications.
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil {
errorIf(err, "Unable to fetch object info for \"%s\"", path.Join(bucket, object))
return
}
// Notify object created event. // Notify object created event.
size := int64(0) // FIXME: support event size. notifyObjectCreatedEvent(nConfig, ObjectCreatedCompleteMultipartUpload, bucket, objInfo)
notifyObjectCreatedEvent(nConfig, ObjectCreatedCompleteMultipartUpload, bucket, object, md5Sum, size)
} }
/// Delete objectAPIHandlers /// Delete objectAPIHandlers
@ -807,6 +836,11 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
// Load notification config if any. // Load notification config if any.
nConfig, err := api.loadNotificationConfig(bucket) nConfig, err := api.loadNotificationConfig(bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil { if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return return

@ -112,7 +112,7 @@ func filterRuleMatch(object string, frs []filterRule) bool {
// - s3:ObjectCreated:Post // - s3:ObjectCreated:Post
// - s3:ObjectCreated:Copy // - s3:ObjectCreated:Copy
// - s3:ObjectCreated:CompleteMultipartUpload // - s3:ObjectCreated:CompleteMultipartUpload
func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, bucket string, object string, etag string, size int64) { func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, bucket string, objInfo ObjectInfo) {
/// Construct a new object created event. /// Construct a new object created event.
region := serverConfig.GetRegion() region := serverConfig.GetRegion()
tnow := time.Now().UTC() tnow := time.Now().UTC()
@ -138,9 +138,9 @@ func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, b
ARN: "arn:aws:s3:::" + bucket, ARN: "arn:aws:s3:::" + bucket,
}, },
Object: objectMeta{ Object: objectMeta{
Key: url.QueryEscape(object), Key: url.QueryEscape(objInfo.Name),
ETag: etag, ETag: objInfo.MD5Sum,
Size: size, Size: objInfo.Size,
Sequencer: sequencer, Sequencer: sequencer,
}, },
}, },
@ -148,7 +148,7 @@ func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, b
} }
// Notify to all the configured queues. // Notify to all the configured queues.
for _, qConfig := range nConfig.QueueConfigurations { for _, qConfig := range nConfig.QueueConfigurations {
ruleMatch := filterRuleMatch(object, qConfig.Filter.Key.FilterRules) ruleMatch := filterRuleMatch(objInfo.Name, qConfig.Filter.Key.FilterRules)
if eventMatch(eventType, qConfig.Events) && ruleMatch { if eventMatch(eventType, qConfig.Events) && ruleMatch {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"Records": events, "Records": events,

Loading…
Cancel
Save