Enable event persistence in AMQP (#7565)

master
Praveen raj Mani 5 years ago committed by kannappanr
parent 6f2b4675fa
commit b0cea1c0f3
  1. 4
      cmd/admin-handlers_test.go
  2. 4
      cmd/config-current.go
  3. 2
      cmd/config-current_test.go
  4. 37
      docs/bucket/notifications/README.md
  5. 4
      docs/config/config.sample.json
  6. 163
      pkg/event/target/amqp.go

@ -83,7 +83,9 @@ var (
"durable": false, "durable": false,
"internal": false, "internal": false,
"noWait": false, "noWait": false,
"autoDeleted": false "autoDeleted": false,
"queueDir": "",
"queueLimit": 0
} }
}, },
"elasticsearch": { "elasticsearch": {

@ -303,7 +303,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable { if !v.Enable {
continue continue
} }
t, err := target.NewAMQPTarget(k, v) t, err := target.NewAMQPTarget(k, v, GlobalServiceDoneCh)
if err != nil { if err != nil {
return fmt.Errorf("amqp(%s): %s", k, err.Error()) return fmt.Errorf("amqp(%s): %s", k, err.Error())
} }
@ -637,7 +637,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
} }
for id, args := range config.Notify.AMQP { for id, args := range config.Notify.AMQP {
if args.Enable { if args.Enable {
newTarget, err := target.NewAMQPTarget(id, args) newTarget, err := target.NewAMQPTarget(id, args, GlobalServiceDoneCh)
if err != nil { if err != nil {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
continue continue

@ -185,7 +185,7 @@ func TestValidateConfig(t *testing.T) {
{`{"version": "` + v + `", "browser": "on", "browser": "on", "region":"us-east-1", "credential" : {"accessKey":"minio", "secretKey":"minio123"}}`, false}, {`{"version": "` + v + `", "browser": "on", "browser": "on", "region":"us-east-1", "credential" : {"accessKey":"minio", "secretKey":"minio123"}}`, false},
// Test 11 - Test AMQP // Test 11 - Test AMQP
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false }}}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false, "queueDir": "", "queueLimit": 0}}}}`, false},
// Test 12 - Test NATS // Test 12 - Test NATS
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "queueDir": "", "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "queueDir": "", "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false},

@ -34,20 +34,22 @@ Install RabbitMQ from [here](https://www.rabbitmq.com/).
The MinIO server configuration file is stored on the backend in json format. The AMQP configuration is located in the `amqp` key under the `notify` top-level key. Create a configuration key-value pair here for your AMQP instance. The key is a name for your AMQP endpoint, and the value is a collection of key-value parameters described in the table below. The MinIO server configuration file is stored on the backend in json format. The AMQP configuration is located in the `amqp` key under the `notify` top-level key. Create a configuration key-value pair here for your AMQP instance. The key is a name for your AMQP endpoint, and the value is a collection of key-value parameters described in the table below.
| Parameter | Type | Description | | Parameter | Type | Description |
| :------------- | :------- | :------------------------------------------------------------------------------ | | :------------- | :------- | :------------------------------------------------------------------------------- |
| `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? | | `enable` | _bool_ | (Required) Is this server endpoint configuration active/enabled? |
| `url` | _string_ | (Required) AMQP server endpoint, e.g. `amqp://myuser:mypassword@localhost:5672` | | `url` | _string_ | (Required) AMQP server endpoint, e.g. `amqp://myuser:mypassword@localhost:5672` |
| `exchange` | _string_ | Name of the exchange. | | `exchange` | _string_ | Name of the exchange. |
| `routingKey` | _string_ | Routing key for publishing. | | `routingKey` | _string_ | Routing key for publishing. |
| `exchangeType` | _string_ | Kind of exchange. | | `exchangeType` | _string_ | Kind of exchange. |
| `deliveryMode` | _uint8_ | Delivery mode for publishing. 0 or 1 - transient; 2 - persistent. | | `deliveryMode` | _uint8_ | Delivery mode for publishing. 0 or 1 - transient; 2 - persistent. |
| `mandatory` | _bool_ | Publishing related bool. | | `mandatory` | _bool_ | Publishing related bool. |
| `immediate` | _bool_ | Publishing related bool. | | `immediate` | _bool_ | Publishing related bool. |
| `durable` | _bool_ | Exchange declaration related bool. | | `durable` | _bool_ | Exchange declaration related bool. |
| `internal` | _bool_ | Exchange declaration related bool. | | `internal` | _bool_ | Exchange declaration related bool. |
| `noWait` | _bool_ | Exchange declaration related bool. | | `noWait` | _bool_ | Exchange declaration related bool. |
| `autoDeleted` | _bool_ | Exchange declaration related bool. | | `autoDeleted` | _bool_ | Exchange declaration related bool. |
| `queueDir` | _string_ | Persistent store for events when AMQP broker is offline |
| `queueLimit` | _int_ | Set the maximum event limit for the persistent store. The default limit is 10000 |
An example configuration for RabbitMQ is shown below: An example configuration for RabbitMQ is shown below:
@ -65,10 +67,13 @@ An example configuration for RabbitMQ is shown below:
"durable": false, "durable": false,
"internal": false, "internal": false,
"noWait": false, "noWait": false,
"autoDeleted": false "autoDeleted": false,
"queueDir": "",
"queueLimit": 0
} }
} }
``` ```
MinIO supports persistent event store. The persistent store will backup events when the AMQP broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000.
To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally.
@ -310,7 +315,7 @@ An example of Elasticsearch configuration is as follows:
}, },
``` ```
Minio supports persistent event store. The persistent store will backup events when the Elasticsearch broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. MinIO supports persistent event store. The persistent store will backup events when the Elasticsearch broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000.
If Elasticsearch has authentication enabled, the credentials can be supplied to MinIO via the `url` parameter formatted as `PROTO://USERNAME:PASSWORD@ELASTICSEARCH_HOST:PORT`. If Elasticsearch has authentication enabled, the credentials can be supplied to MinIO via the `url` parameter formatted as `PROTO://USERNAME:PASSWORD@ELASTICSEARCH_HOST:PORT`.

@ -48,7 +48,9 @@
"durable": false, "durable": false,
"internal": false, "internal": false,
"noWait": false, "noWait": false,
"autoDeleted": false "autoDeleted": false,
"queueDir": "",
"queueLimit": 0
} }
}, },
"elasticsearch": { "elasticsearch": {

@ -17,12 +17,16 @@
package target package target
import ( import (
"context"
"encoding/json" "encoding/json"
"errors"
"net" "net"
"net/url" "net/url"
"os"
"path/filepath"
"sync" "sync"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
"github.com/streadway/amqp" "github.com/streadway/amqp"
@ -42,6 +46,8 @@ type AMQPArgs struct {
Internal bool `json:"internal"` Internal bool `json:"internal"`
NoWait bool `json:"noWait"` NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"` AutoDeleted bool `json:"autoDeleted"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
} }
// Validate AMQP arguments // Validate AMQP arguments
@ -52,6 +58,15 @@ func (a *AMQPArgs) Validate() error {
if _, err := amqp.ParseURI(a.URL.String()); err != nil { if _, err := amqp.ParseURI(a.URL.String()); err != nil {
return err return err
} }
if a.QueueDir != "" {
if !filepath.IsAbs(a.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if a.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil return nil
} }
@ -61,6 +76,7 @@ type AMQPTarget struct {
args AMQPArgs args AMQPArgs
conn *amqp.Connection conn *amqp.Connection
connMutex sync.Mutex connMutex sync.Mutex
store Store
} }
// ID - returns TargetID. // ID - returns TargetID.
@ -69,6 +85,10 @@ func (target *AMQPTarget) ID() event.TargetID {
} }
func (target *AMQPTarget) channel() (*amqp.Channel, error) { func (target *AMQPTarget) channel() (*amqp.Channel, error) {
var err error
var conn *amqp.Connection
var ch *amqp.Channel
isAMQPClosedErr := func(err error) bool { isAMQPClosedErr := func(err error) bool {
if err == amqp.ErrClosed { if err == amqp.ErrClosed {
return true return true
@ -84,21 +104,27 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) {
target.connMutex.Lock() target.connMutex.Lock()
defer target.connMutex.Unlock() defer target.connMutex.Unlock()
ch, err := target.conn.Channel() if target.conn != nil {
if err == nil { ch, err = target.conn.Channel()
return ch, nil if err == nil {
} return ch, nil
}
if !isAMQPClosedErr(err) { if !isAMQPClosedErr(err) {
return nil, err return nil, err
}
} }
var conn *amqp.Connection conn, err = amqp.Dial(target.args.URL.String())
if conn, err = amqp.Dial(target.args.URL.String()); err != nil { if err != nil {
if IsConnRefusedErr(err) {
return nil, errNotConnected
}
return nil, err return nil, err
} }
if ch, err = conn.Channel(); err != nil { ch, err = conn.Channel()
if err != nil {
return nil, err return nil, err
} }
@ -107,21 +133,8 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) {
return ch, nil return ch, nil
} }
// Save - Sends event directly without persisting. // send - sends an event to the AMQP.
func (target *AMQPTarget) Save(eventData event.Event) error { func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error {
return target.send(eventData)
}
func (target *AMQPTarget) send(eventData event.Event) error {
ch, err := target.channel()
if err != nil {
return err
}
defer func() {
// FIXME: log returned error. ignore time being.
_ = ch.Close()
}()
objectName, err := url.QueryUnescape(eventData.S3.Object.Key) objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil { if err != nil {
return err return err
@ -138,17 +151,62 @@ func (target *AMQPTarget) send(eventData event.Event) error {
return err return err
} }
return ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, if err := ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp.Publishing{ target.args.Immediate, amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode, DeliveryMode: target.args.DeliveryMode,
Body: data, Body: data,
}) }); err != nil {
return err
}
return nil
}
// Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
ch, err := target.channel()
if err != nil {
return err
}
defer func() {
cErr := ch.Close()
logger.LogOnceIf(context.Background(), cErr, target.ID())
}()
return target.send(eventData, ch)
} }
// Send - interface compatible method does no-op. // Send - sends event to AMQP.
func (target *AMQPTarget) Send(eventKey string) error { func (target *AMQPTarget) Send(eventKey string) error {
return nil ch, err := target.channel()
if err != nil {
return err
}
defer func() {
cErr := ch.Close()
logger.LogOnceIf(context.Background(), cErr, target.ID())
}()
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData, ch); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(eventKey)
} }
// Close - does nothing and available for interface compatibility. // Close - does nothing and available for interface compatibility.
@ -157,25 +215,40 @@ func (target *AMQPTarget) Close() error {
} }
// NewAMQPTarget - creates new AMQP target. // NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs) (*AMQPTarget, error) { func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}) (*AMQPTarget, error) {
var conn *amqp.Connection var conn *amqp.Connection
var err error var err error
// Retry 5 times with time interval of 2 seconds.
for i := 1; i <= 5; i++ { var store Store
conn, err = amqp.Dial(args.URL.String())
if err != nil { if args.QueueDir != "" {
if i == 5 { queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
return nil, err store = NewQueueStore(queueDir, args.QueueLimit)
} if oErr := store.Open(); oErr != nil {
time.Sleep(2 * time.Second) return nil, oErr
continue
} }
break
} }
return &AMQPTarget{ conn, err = amqp.Dial(args.URL.String())
id: event.TargetID{ID: id, Name: "amqp"}, if err != nil {
args: args, if store == nil || !IsConnRefusedErr(err) {
conn: conn, return nil, err
}, nil }
}
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
conn: conn,
store: store,
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil
} }

Loading…
Cancel
Save