diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index ff4356738..e06dda200 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -83,7 +83,9 @@ var ( "durable": false, "internal": false, "noWait": false, - "autoDeleted": false + "autoDeleted": false, + "queueDir": "", + "queueLimit": 0 } }, "elasticsearch": { diff --git a/cmd/config-current.go b/cmd/config-current.go index e5e4af4e2..723e24487 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -303,7 +303,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewAMQPTarget(k, v) + t, err := target.NewAMQPTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("amqp(%s): %s", k, err.Error()) } @@ -637,7 +637,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { } for id, args := range config.Notify.AMQP { if args.Enable { - newTarget, err := target.NewAMQPTarget(id, args) + newTarget, err := target.NewAMQPTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index f7b528e07..c1d65ae35 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -185,7 +185,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "browser": "on", "browser": "on", "region":"us-east-1", "credential" : {"accessKey":"minio", "secretKey":"minio123"}}`, false}, // Test 11 - Test AMQP - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false }}}}`, false}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false, "queueDir": "", "queueLimit": 0}}}}`, false}, // Test 12 - Test NATS {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "queueDir": "", "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false}, diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index db7e36839..d7c04f9b1 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -34,20 +34,22 @@ Install RabbitMQ from [here](https://www.rabbitmq.com/). The MinIO server configuration file is stored on the backend in json format. The AMQP configuration is located in the `amqp` key under the `notify` top-level key. Create a configuration key-value pair here for your AMQP instance. The key is a name for your AMQP endpoint, and the value is a collection of key-value parameters described in the table below. -| Parameter | Type | Description | -| :------------- | :------- | :------------------------------------------------------------------------------ | -| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | -| `url` | _string_ | (Required) AMQP server endpoint, e.g. `amqp://myuser:mypassword@localhost:5672` | -| `exchange` | _string_ | Name of the exchange. | -| `routingKey` | _string_ | Routing key for publishing. | -| `exchangeType` | _string_ | Kind of exchange. | -| `deliveryMode` | _uint8_ | Delivery mode for publishing. 0 or 1 - transient; 2 - persistent. | -| `mandatory` | _bool_ | Publishing related bool. | -| `immediate` | _bool_ | Publishing related bool. | -| `durable` | _bool_ | Exchange declaration related bool. | -| `internal` | _bool_ | Exchange declaration related bool. | -| `noWait` | _bool_ | Exchange declaration related bool. | -| `autoDeleted` | _bool_ | Exchange declaration related bool. | +| Parameter | Type | Description | +| :------------- | :------- | :------------------------------------------------------------------------------- | +| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | +| `url` | _string_ | (Required) AMQP server endpoint, e.g. `amqp://myuser:mypassword@localhost:5672` | +| `exchange` | _string_ | Name of the exchange. | +| `routingKey` | _string_ | Routing key for publishing. | +| `exchangeType` | _string_ | Kind of exchange. | +| `deliveryMode` | _uint8_ | Delivery mode for publishing. 0 or 1 - transient; 2 - persistent. | +| `mandatory` | _bool_ | Publishing related bool. | +| `immediate` | _bool_ | Publishing related bool. | +| `durable` | _bool_ | Exchange declaration related bool. | +| `internal` | _bool_ | Exchange declaration related bool. | +| `noWait` | _bool_ | Exchange declaration related bool. | +| `autoDeleted` | _bool_ | Exchange declaration related bool. | +| `queueDir` | _string_ | Persistent store for events when AMQP broker is offline | +| `queueLimit` | _int_ | Set the maximum event limit for the persistent store. The default limit is 10000 | An example configuration for RabbitMQ is shown below: @@ -65,10 +67,13 @@ An example configuration for RabbitMQ is shown below: "durable": false, "internal": false, "noWait": false, - "autoDeleted": false + "autoDeleted": false, + "queueDir": "", + "queueLimit": 0 } } ``` +MinIO supports persistent event store. The persistent store will backup events when the AMQP broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. @@ -310,7 +315,7 @@ An example of Elasticsearch configuration is as follows: }, ``` -Minio supports persistent event store. The persistent store will backup events when the Elasticsearch broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. +MinIO supports persistent event store. The persistent store will backup events when the Elasticsearch broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. If Elasticsearch has authentication enabled, the credentials can be supplied to MinIO via the `url` parameter formatted as `PROTO://USERNAME:PASSWORD@ELASTICSEARCH_HOST:PORT`. diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index 781c64ef9..5ba269b9e 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -48,7 +48,9 @@ "durable": false, "internal": false, "noWait": false, - "autoDeleted": false + "autoDeleted": false, + "queueDir": "", + "queueLimit": 0 } }, "elasticsearch": { diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index 65ff5cf1c..4b72d4d64 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -17,12 +17,16 @@ package target import ( + "context" "encoding/json" + "errors" "net" "net/url" + "os" + "path/filepath" "sync" - "time" + "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" "github.com/streadway/amqp" @@ -42,6 +46,8 @@ type AMQPArgs struct { Internal bool `json:"internal"` NoWait bool `json:"noWait"` AutoDeleted bool `json:"autoDeleted"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` } // Validate AMQP arguments @@ -52,6 +58,15 @@ func (a *AMQPArgs) Validate() error { if _, err := amqp.ParseURI(a.URL.String()); err != nil { return err } + if a.QueueDir != "" { + if !filepath.IsAbs(a.QueueDir) { + return errors.New("queueDir path should be absolute") + } + } + if a.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } + return nil } @@ -61,6 +76,7 @@ type AMQPTarget struct { args AMQPArgs conn *amqp.Connection connMutex sync.Mutex + store Store } // ID - returns TargetID. @@ -69,6 +85,10 @@ func (target *AMQPTarget) ID() event.TargetID { } func (target *AMQPTarget) channel() (*amqp.Channel, error) { + var err error + var conn *amqp.Connection + var ch *amqp.Channel + isAMQPClosedErr := func(err error) bool { if err == amqp.ErrClosed { return true @@ -84,21 +104,27 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) { target.connMutex.Lock() defer target.connMutex.Unlock() - ch, err := target.conn.Channel() - if err == nil { - return ch, nil - } + if target.conn != nil { + ch, err = target.conn.Channel() + if err == nil { + return ch, nil + } - if !isAMQPClosedErr(err) { - return nil, err + if !isAMQPClosedErr(err) { + return nil, err + } } - var conn *amqp.Connection - if conn, err = amqp.Dial(target.args.URL.String()); err != nil { + conn, err = amqp.Dial(target.args.URL.String()) + if err != nil { + if IsConnRefusedErr(err) { + return nil, errNotConnected + } return nil, err } - if ch, err = conn.Channel(); err != nil { + ch, err = conn.Channel() + if err != nil { return nil, err } @@ -107,21 +133,8 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) { return ch, nil } -// Save - Sends event directly without persisting. -func (target *AMQPTarget) Save(eventData event.Event) error { - return target.send(eventData) -} - -func (target *AMQPTarget) send(eventData event.Event) error { - ch, err := target.channel() - if err != nil { - return err - } - defer func() { - // FIXME: log returned error. ignore time being. - _ = ch.Close() - }() - +// send - sends an event to the AMQP. +func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -138,17 +151,62 @@ func (target *AMQPTarget) send(eventData event.Event) error { return err } - return ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, + if err := ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, target.args.Immediate, amqp.Publishing{ ContentType: "application/json", DeliveryMode: target.args.DeliveryMode, Body: data, - }) + }); err != nil { + return err + } + + return nil +} + +// Save - saves the events to the store which will be replayed when the amqp connection is active. +func (target *AMQPTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + ch, err := target.channel() + if err != nil { + return err + } + defer func() { + cErr := ch.Close() + logger.LogOnceIf(context.Background(), cErr, target.ID()) + }() + + return target.send(eventData, ch) } -// Send - interface compatible method does no-op. +// Send - sends event to AMQP. func (target *AMQPTarget) Send(eventKey string) error { - return nil + ch, err := target.channel() + if err != nil { + return err + } + defer func() { + cErr := ch.Close() + logger.LogOnceIf(context.Background(), cErr, target.ID()) + }() + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData, ch); err != nil { + return err + } + + // Delete the event from store. + return target.store.Del(eventKey) } // Close - does nothing and available for interface compatibility. @@ -157,25 +215,40 @@ func (target *AMQPTarget) Close() error { } // NewAMQPTarget - creates new AMQP target. -func NewAMQPTarget(id string, args AMQPArgs) (*AMQPTarget, error) { +func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarget, error) { var conn *amqp.Connection var err error - // Retry 5 times with time interval of 2 seconds. - for i := 1; i <= 5; i++ { - conn, err = amqp.Dial(args.URL.String()) - if err != nil { - if i == 5 { - return nil, err - } - time.Sleep(2 * time.Second) - continue + + var store Store + + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil, oErr } - break } - return &AMQPTarget{ - id: event.TargetID{ID: id, Name: "amqp"}, - args: args, - conn: conn, - }, nil + conn, err = amqp.Dial(args.URL.String()) + if err != nil { + if store == nil || !IsConnRefusedErr(err) { + return nil, err + } + } + + target := &AMQPTarget{ + id: event.TargetID{ID: id, Name: "amqp"}, + args: args, + conn: conn, + store: store, + } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) + } + + return target, nil }