From 018c90dae7a0bbb8e096db11ae0fac059099751d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 20 Sep 2016 16:36:18 -0700 Subject: [PATCH] events: ElasticSearch doesnt support objects with '/' in them. (#2747) Fix this by using a unique sha256 generated for each unique key. --- cmd/event-notifier.go | 2 +- cmd/notify-elasticsearch.go | 21 ++++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index 286894f70..08617b2b1 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -204,7 +204,7 @@ func eventNotify(event eventData) { targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN) if targetLog != nil { targetLog.WithFields(logrus.Fields{ - "Key": objectName, + "Key": path.Join(event.Bucket, objectName), "EventType": eventType, "Records": notificationEvent, }).Info() diff --git a/cmd/notify-elasticsearch.go b/cmd/notify-elasticsearch.go index 92f7cb3db..454f3c390 100644 --- a/cmd/notify-elasticsearch.go +++ b/cmd/notify-elasticsearch.go @@ -17,10 +17,12 @@ package cmd import ( + "encoding/hex" "errors" "io/ioutil" "github.com/Sirupsen/logrus" + "github.com/minio/sha256-simd" "gopkg.in/olivere/elastic.v3" ) @@ -41,7 +43,11 @@ func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) { if !esNotify.Enable { return nil, errNotifyNotEnabled } - client, err := elastic.NewClient(elastic.SetURL(esNotify.URL), elastic.SetSniff(false)) + client, err := elastic.NewClient( + elastic.SetURL(esNotify.URL), + elastic.SetSniff(false), + elastic.SetMaxRetries(10), + ) if err != nil { return nil, err } @@ -70,7 +76,7 @@ func newElasticNotify(accountID string) (*logrus.Logger, error) { return nil, err } if !createIndex.Acknowledged { - return nil, errors.New("index not created") + return nil, errors.New("Index not created.") } } @@ -106,19 +112,24 @@ func (q elasticClient) Fire(entry *logrus.Entry) error { return nil } + // Calculate a unique key id. Choosing sha256 here. + shaKey := sha256.Sum256([]byte(keyStr)) + keyStr = hex.EncodeToString(shaKey[:]) + // If event matches as delete, we purge the previous index. if eventMatch(entryStr, []string{"s3:ObjectRemoved:*"}) { - _, err := q.Client.DeleteIndex(keyStr).Do() + _, err := q.Client.Delete().Index(q.params.Index). + Type("event").Id(keyStr).Do() if err != nil { return err } return nil } // else we update elastic index or create a new one. - _, err := q.Client.Index().Index(keyStr). + _, err := q.Client.Index().Index(q.params.Index). Type("event"). BodyJson(map[string]interface{}{ "Records": entry.Data["Records"], - }).Do() + }).Id(keyStr).Do() return err }