From bba562235bfefd9935276c582ee84649eae9a3da Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Fri, 12 Jul 2019 08:23:20 +0530 Subject: [PATCH] Enable persistent event store in elasticsearch (#7564) --- cmd/admin-handlers_test.go | 4 +- cmd/config-current.go | 4 +- cmd/config-current_test.go | 2 +- cmd/config-versions.go | 4 +- docs/bucket/notifications/README.md | 20 ++-- docs/config/config.sample.json | 4 +- go.mod | 1 + pkg/event/target/elasticsearch.go | 144 +++++++++++++++++++++++----- pkg/event/target/kafka.go | 2 +- pkg/event/target/mqtt.go | 4 +- pkg/event/target/queuestore.go | 16 +++- pkg/event/target/queuestore_test.go | 2 +- pkg/event/target/webhook.go | 2 +- 13 files changed, 164 insertions(+), 45 deletions(-) diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index a44d1df41..761c29cbb 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -91,7 +91,9 @@ var ( "enable": false, "format": "namespace", "url": "", - "index": "" + "index": "", + "queueDir": "", + "queueLimit": 0 } }, "kafka": { diff --git a/cmd/config-current.go b/cmd/config-current.go index aedc960f9..2fdb3f8b1 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -314,7 +314,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewElasticsearchTarget(k, v) + t, err := target.NewElasticsearchTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("elasticsearch(%s): %s", k, err.Error()) } @@ -651,7 +651,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.Elasticsearch { if args.Enable { - newTarget, err := target.NewElasticsearchTarget(id, args) + newTarget, err := target.NewElasticsearchTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index ce504ce1f..ad2679a7c 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -224,7 +224,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "invalid", "url": "example.com", "index": "myindex" } }}}`, false}, // Test 24 - Test valid Format for ElasticSearch - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "namespace", "url": "example.com", "index": "myindex" } }}}`, true}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "format": "namespace", "url": "example.com", "index": "myindex", "queueDir": "", "queueLimit": 0 } }}}`, true}, // Test 25 - Test Format for Redis {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "format": "invalid", "address": "example.com:80", "password": "xxx", "key": "key1" } }}}`, false}, diff --git a/cmd/config-versions.go b/cmd/config-versions.go index 0cb1256a7..5cc8d6a1d 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -22,7 +22,7 @@ import ( "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/event/target" - "github.com/minio/minio/pkg/iam/policy" + iampolicy "github.com/minio/minio/pkg/iam/policy" "github.com/minio/minio/pkg/iam/validator" "github.com/minio/minio/pkg/quick" ) @@ -884,7 +884,7 @@ type serverConfigV32 struct { } `json:"policy"` } -// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in notification targets. +// serverConfigV33 is just like version '32', removes clientID from NATS and MQTT, and adds queueDir, queueLimit in all notification targets. type serverConfigV33 struct { quick.Config `json:"-"` // ignore interfaces diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 1d44efab3..a6a82255c 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -286,12 +286,14 @@ MinIO requires a 5.x series version of Elasticsearch. This is the latest major r The MinIO server configuration file is stored on the backend in json format. The Elasticsearch configuration is located in the `elasticsearch` key under the `notify` top-level key. Create a configuration key-value pair here for your Elasticsearch instance. The key is a name for your Elasticsearch endpoint, and the value is a collection of key-value parameters described in the table below. -| Parameter | Type | Description | -| :-------- | :------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | -| `format` | _string_ | (Required) Either `namespace` or `access`. | -| `url` | _string_ | (Required) The Elasticsearch server's address, with optional authentication info. For example: `http://localhost:9200` or with authentication info `http://elastic:MagicWord@127.0.0.1:9200`. | -| `index` | _string_ | (Required) The name of an Elasticsearch index in which MinIO will store documents. | +| Parameter | Type | Description | +| :----------- | :------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | +| `format` | _string_ | (Required) Either `namespace` or `access`. | +| `url` | _string_ | (Required) The Elasticsearch server's address, with optional authentication info. For example: `http://localhost:9200` or with authentication info `http://elastic:MagicWord@127.0.0.1:9200`. | +| `index` | _string_ | (Required) The name of an Elasticsearch index in which MinIO will store documents. | +| `queueDir` | _string_ | Persistent store for events when Elasticsearch broker is offline | +| `queueLimit` | _int_ | Set the maximum event limit for the persistent store. The default limit is 10000 | An example of Elasticsearch configuration is as follows: @@ -301,11 +303,15 @@ An example of Elasticsearch configuration is as follows: "enable": true, "format": "namespace", "url": "http://127.0.0.1:9200", - "index": "minio_events" + "index": "minio_events", + "queueDir": "", + "queueLimit": 0 } }, ``` +Minio supports persistent event store. The persistent store will backup events when the Elasticsearch broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. + If Elasticsearch has authentication enabled, the credentials can be supplied to MinIO via the `url` parameter formatted as `PROTO://USERNAME:PASSWORD@ELASTICSEARCH_HOST:PORT`. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index 7176c8396..363584400 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -56,7 +56,9 @@ "enable": false, "format": "", "url": "", - "index": "" + "index": "", + "queueDir": "", + "queueLimit": 0 } }, "kafka": { diff --git a/go.mod b/go.mod index 726a799e5..f239ad9bd 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/nats-io/stan.go v0.4.5 github.com/ncw/directio v1.0.5 github.com/nsqio/go-nsq v1.0.7 + github.com/pkg/errors v0.8.1 github.com/pkg/profile v1.3.0 github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 github.com/rcrowley/go-metrics v0.0.0-20190704165056-9c2d0518ed81 // indirect diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 46eef6409..85658678d 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -18,23 +18,28 @@ package target import ( "context" - "errors" "fmt" + "net" "net/url" + "os" + "path/filepath" "strings" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" + "github.com/pkg/errors" "gopkg.in/olivere/elastic.v5" ) // ElasticsearchArgs - Elasticsearch target arguments. type ElasticsearchArgs struct { - Enable bool `json:"enable"` - Format string `json:"format"` - URL xnet.URL `json:"url"` - Index string `json:"index"` + Enable bool `json:"enable"` + Format string `json:"format"` + URL xnet.URL `json:"url"` + Index string `json:"index"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` } // Validate ElasticsearchArgs fields @@ -54,6 +59,9 @@ func (a ElasticsearchArgs) Validate() error { if a.Index == "" { return errors.New("empty index value") } + if a.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } return nil } @@ -62,6 +70,7 @@ type ElasticsearchTarget struct { id event.TargetID args ElasticsearchArgs client *elastic.Client + store Store } // ID - returns target ID. @@ -69,17 +78,31 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } -// Save - Sends event directly without persisting. +// Save - saves the events to the store if queuestore is configured, which will be replayed when the elasticsearch connection is active. func (target *ElasticsearchTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if _, err := net.Dial("tcp", target.args.URL.Host); err != nil { + return errNotConnected + } return target.send(eventData) } +// send - sends the event to the target. func (target *ElasticsearchTarget) send(eventData event.Event) error { var key string + exists := func() (bool, error) { + return target.client.Exists().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) + } + remove := func() error { - _, err := target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) + exists, err := exists() + if err == nil && exists { + _, err = target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background()) + } return err } @@ -116,9 +139,38 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error { return nil } -// Send - interface compatible method does no-op. +// Send - reads an event from store and sends it to Elasticsearch. func (target *ElasticsearchTarget) Send(eventKey string) error { - return nil + + var err error + + if target.client == nil { + target.client, err = newClient(target.args) + if err != nil { + return err + } + } + + if _, err := net.Dial("tcp", target.args.URL.Host); err != nil { + return errNotConnected + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData); err != nil { + return err + } + + // Delete the event from store. + return target.store.Del(eventKey) } // Close - does nothing and available for interface compatibility. @@ -126,32 +178,80 @@ func (target *ElasticsearchTarget) Close() error { return nil } -// NewElasticsearchTarget - creates new Elasticsearch target. -func NewElasticsearchTarget(id string, args ElasticsearchArgs) (*ElasticsearchTarget, error) { - client, err := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10)) - if err != nil { - return nil, err - } - +// createIndex - creates the index if it does not exist. +func createIndex(client *elastic.Client, args ElasticsearchArgs) error { exists, err := client.IndexExists(args.Index).Do(context.Background()) if err != nil { - return nil, err + return err } - if !exists { var createIndex *elastic.IndicesCreateResult if createIndex, err = client.CreateIndex(args.Index).Do(context.Background()); err != nil { - return nil, err + return err } if !createIndex.Acknowledged { - return nil, fmt.Errorf("index %v not created", args.Index) + return fmt.Errorf("index %v not created", args.Index) + } + } + return nil +} + +// newClient - creates a new elastic client with args provided. +func newClient(args ElasticsearchArgs) (*elastic.Client, error) { + client, clientErr := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10)) + if clientErr != nil { + if !(errors.Cause(clientErr) == elastic.ErrNoClient) { + return nil, clientErr + } + } else { + if err := createIndex(client, args); err != nil { + return nil, err } } + return client, nil +} + +// NewElasticsearchTarget - creates new Elasticsearch target. +func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}) (*ElasticsearchTarget, error) { + var client *elastic.Client + var err error + + var store Store - return &ElasticsearchTarget{ + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil, oErr + } + } + + _, derr := net.Dial("tcp", args.URL.Host) + if derr != nil { + if store == nil { + return nil, derr + } + } else { + client, err = newClient(args) + if err != nil { + return nil, err + } + } + + target := &ElasticsearchTarget{ id: event.TargetID{ID: id, Name: "elasticsearch"}, args: args, client: client, - }, nil + store: store, + } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) + } + + return target, nil } diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 07fa47fb4..3dda0ae90 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -37,7 +37,7 @@ type KafkaArgs struct { Brokers []xnet.Host `json:"brokers"` Topic string `json:"topic"` QueueDir string `json:"queueDir"` - QueueLimit uint16 `json:"queueLimit"` + QueueLimit uint64 `json:"queueLimit"` TLS struct { Enable bool `json:"enable"` SkipVerify bool `json:"skipVerify"` diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index fa936efa9..0cc22d0f3 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -48,7 +48,7 @@ type MQTTArgs struct { KeepAlive time.Duration `json:"keepAliveInterval"` RootCAs *x509.CertPool `json:"-"` QueueDir string `json:"queueDir"` - QueueLimit uint16 `json:"queueLimit"` + QueueLimit uint64 `json:"queueLimit"` } // Validate MQTTArgs fields @@ -139,7 +139,7 @@ func (target *MQTTTarget) Send(eventKey string) error { return target.store.Del(eventKey) } -// Save - saves the events to the store if questore is configured, which will be replayed when the mqtt connection is active. +// Save - saves the events to the store if queuestore is configured, which will be replayed when the mqtt connection is active. func (target *MQTTTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) diff --git a/pkg/event/target/queuestore.go b/pkg/event/target/queuestore.go index 3b55970ac..6d9005fe1 100644 --- a/pkg/event/target/queuestore.go +++ b/pkg/event/target/queuestore.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/minio/minio/pkg/event" + "github.com/minio/minio/pkg/sys" ) const ( @@ -36,15 +37,22 @@ const ( type QueueStore struct { sync.RWMutex directory string - eC uint16 - limit uint16 + eC uint64 + limit uint64 } // NewQueueStore - Creates an instance for QueueStore. -func NewQueueStore(directory string, limit uint16) *QueueStore { +func NewQueueStore(directory string, limit uint64) *QueueStore { if limit == 0 { limit = maxLimit + currRlimit, _, err := sys.GetMaxOpenFileLimit() + if err == nil { + if currRlimit > limit { + limit = currRlimit + } + } } + queueStore := &QueueStore{ directory: directory, limit: limit, @@ -61,7 +69,7 @@ func (store *QueueStore) Open() error { return terr } - eCount := uint16(len(store.list())) + eCount := uint64(len(store.list())) if eCount >= store.limit { return errLimitExceeded } diff --git a/pkg/event/target/queuestore_test.go b/pkg/event/target/queuestore_test.go index fee1e9b48..cd8a995ba 100644 --- a/pkg/event/target/queuestore_test.go +++ b/pkg/event/target/queuestore_test.go @@ -33,7 +33,7 @@ var queueDir = filepath.Join(os.TempDir(), "minio_test") var testEvent = event.Event{EventVersion: "1.0", EventSource: "test_source", AwsRegion: "test_region", EventTime: "test_time", EventName: event.ObjectAccessedGet} // Initialize the store. -func setUpStore(directory string, limit uint16) (Store, error) { +func setUpStore(directory string, limit uint64) (Store, error) { store := NewQueueStore(queueDir, limit) if oErr := store.Open(); oErr != nil { return nil, oErr diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index fbc46fe74..2f2a4a0b3 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -43,7 +43,7 @@ type WebhookArgs struct { Endpoint xnet.URL `json:"endpoint"` RootCAs *x509.CertPool `json:"-"` QueueDir string `json:"queueDir"` - QueueLimit uint16 `json:"queueLimit"` + QueueLimit uint64 `json:"queueLimit"` } // Validate WebhookArgs fields