Enable event-persistence in NATS and NATS-Streaming (#7612)

master
Praveen raj Mani 5 years ago committed by kannappanr
parent 2b9b907f9c
commit c9349747ca
  1. 2
      cmd/admin-handlers_test.go
  2. 4
      cmd/config-current.go
  3. 2
      cmd/config-current_test.go
  4. 56
      docs/bucket/notifications/README.md
  5. 2
      docs/config/config.sample.json
  6. 166
      pkg/event/target/nats.go

@ -152,6 +152,8 @@ var (
"token": "", "token": "",
"secure": false, "secure": false,
"pingInterval": 0, "pingInterval": 0,
"queueDir": "",
"queueLimit": 0,
"streaming": { "streaming": {
"enable": false, "enable": false,
"clusterID": "", "clusterID": "",

@ -358,7 +358,7 @@ func (s *serverConfig) TestNotificationTargets() error {
if !v.Enable { if !v.Enable {
continue continue
} }
t, err := target.NewNATSTarget(k, v) t, err := target.NewNATSTarget(k, v, GlobalServiceDoneCh)
if err != nil { if err != nil {
return fmt.Errorf("nats(%s): %s", k, err.Error()) return fmt.Errorf("nats(%s): %s", k, err.Error())
} }
@ -710,7 +710,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList {
for id, args := range config.Notify.NATS { for id, args := range config.Notify.NATS {
if args.Enable { if args.Enable {
newTarget, err := target.NewNATSTarget(id, args) newTarget, err := target.NewNATSTarget(id, args, GlobalServiceDoneCh)
if err != nil { if err != nil {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
continue continue

@ -188,7 +188,7 @@ func TestValidateConfig(t *testing.T) {
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false }}}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false }}}}`, false},
// Test 12 - Test NATS // Test 12 - Test NATS
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "queueDir": "", "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false},
// Test 13 - Test ElasticSearch // Test 13 - Test ElasticSearch
{`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "url": "", "index": "" } }}}`, false}, {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "url": "", "index": "" } }}}`, false},

@ -547,11 +547,12 @@ The NATS configuration block in `config.json` is as follows:
"password": "yoursecret", "password": "yoursecret",
"token": "", "token": "",
"secure": false, "secure": false,
"pingInterval": 0 "pingInterval": 0,
"queueDir": "",
"queueLimit": 0,
"streaming": { "streaming": {
"enable": false, "enable": false,
"clusterID": "", "clusterID": "",
"clientID": "",
"async": false, "async": false,
"maxPubAcksInflight": 0 "maxPubAcksInflight": 0
} }
@ -559,6 +560,8 @@ The NATS configuration block in `config.json` is as follows:
}, },
``` ```
MinIO supports persistent event store. The persistent store will backup events when the NATS broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000.
To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally.
```sh ```sh
@ -571,7 +574,7 @@ After updating the NATS configuration in /tmp/myconfig , use `mc admin config se
$ mc admin config set myminio < /tmp/myconfig $ mc admin config set myminio < /tmp/myconfig
``` ```
MinIO server also supports [NATS Streaming mode](http://nats.io/documentation/streaming/nats-streaming-intro/) that offers additional functionality like `Message/event persistence`, `At-least-once-delivery`, and `Publisher rate limiting`. To configure MinIO server to send notifications to NATS Streaming server, update the MinIO server configuration file as follows: MinIO server also supports [NATS Streaming mode](http://nats.io/documentation/streaming/nats-streaming-intro/) that offers additional functionality like `At-least-once-delivery`, and `Publisher rate limiting`. To configure MinIO server to send notifications to NATS Streaming server, update the MinIO server configuration file as follows:
``` ```
"nats": { "nats": {
@ -584,10 +587,11 @@ MinIO server also supports [NATS Streaming mode](http://nats.io/documentation/st
"token": "", "token": "",
"secure": false, "secure": false,
"pingInterval": 0, "pingInterval": 0,
"queueDir": "",
"queueLimit": 0,
"streaming": { "streaming": {
"enable": true, "enable": true,
"clusterID": "test-cluster", "clusterID": "test-cluster",
"clientID": "minio-client",
"async": true, "async": true,
"maxPubAcksInflight": 10 "maxPubAcksInflight": 10
} }
@ -677,20 +681,47 @@ import (
) )
func main() { func main() {
natsConnection, _ := stan.Connect("test-cluster", "test-client")
log.Println("Connected")
// Subscribe to subject var stanConnection stan.Conn
log.Printf("Subscribing to subject 'bucketevents'\n")
natsConnection.Subscribe("bucketevents", func(m *stan.Msg) {
// Handle the message subscribe := func() {
fmt.Printf("Received a message: %s\n", string(m.Data)) fmt.Printf("Subscribing to subject 'bucketevents'\n")
}) stanConnection.Subscribe("bucketevents", func(m *stan.Msg) {
// Handle the message
fmt.Printf("Received a message: %s\n", string(m.Data))
})
}
stanConnection, _ = stan.Connect("test-cluster", "test-client", stan.NatsURL("nats://yourusername:yoursecret@0.0.0.0:4222"), stan.SetConnectionLostHandler(func(c stan.Conn, _ error) {
go func() {
for {
// Reconnect if the connection is lost.
if stanConnection == nil || stanConnection.NatsConn() == nil || !stanConnection.NatsConn().IsConnected() {
stanConnection, _ = stan.Connect("test-cluster", "test-client", stan.NatsURL("nats://yourusername:yoursecret@0.0.0.0:4222"), stan.SetConnectionLostHandler(func(c stan.Conn, _ error) {
if c.NatsConn() != nil {
c.NatsConn().Close()
}
_ = c.Close()
}))
if stanConnection != nil {
subscribe()
}
}
}
}()
}))
// Subscribe to subject
subscribe()
// Keep the connection alive // Keep the connection alive
runtime.Goexit() runtime.Goexit()
} }
``` ```
``` ```
@ -957,6 +988,7 @@ The MinIO server configuration file is stored on the backend in json format. Upd
} }
} }
``` ```
MinIO supports persistent event store. The persistent store will backup events when the kafka broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. MinIO supports persistent event store. The persistent store will backup events when the kafka broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000.
To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally.

@ -117,6 +117,8 @@
"token": "", "token": "",
"secure": false, "secure": false,
"pingInterval": 0, "pingInterval": 0,
"queueDir": "",
"queueLimit": 0,
"streaming": { "streaming": {
"enable": false, "enable": false,
"clusterID": "", "clusterID": "",

@ -20,6 +20,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"net/url" "net/url"
"os"
"path/filepath"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
@ -37,6 +39,8 @@ type NATSArgs struct {
Token string `json:"token"` Token string `json:"token"`
Secure bool `json:"secure"` Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"` PingInterval int64 `json:"pingInterval"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
Streaming struct { Streaming struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
ClusterID string `json:"clusterID"` ClusterID string `json:"clusterID"`
@ -65,15 +69,57 @@ func (n NATSArgs) Validate() error {
} }
} }
if n.QueueDir != "" {
if !filepath.IsAbs(n.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
if n.QueueLimit > 10000 {
return errors.New("queueLimit should not exceed 10000")
}
return nil return nil
} }
// To obtain a nats connection from args.
func (n NATSArgs) connectNats() (*nats.Conn, error) {
options := nats.DefaultOptions
options.Url = "nats://" + n.Address.String()
options.User = n.Username
options.Password = n.Password
options.Token = n.Token
options.Secure = n.Secure
return options.Connect()
}
// To obtain a streaming connection from args.
func (n NATSArgs) connectStan() (stan.Conn, error) {
scheme := "nats"
if n.Secure {
scheme = "tls"
}
addressURL := scheme + "://" + n.Username + ":" + n.Password + "@" + n.Address.String()
clientID, err := getNewUUID()
if err != nil {
return nil, err
}
connOpts := []stan.Option{stan.NatsURL(addressURL)}
if n.Streaming.MaxPubAcksInflight > 0 {
connOpts = append(connOpts, stan.MaxPubAcksInflight(n.Streaming.MaxPubAcksInflight))
}
return stan.Connect(n.Streaming.ClusterID, clientID, connOpts...)
}
// NATSTarget - NATS target. // NATSTarget - NATS target.
type NATSTarget struct { type NATSTarget struct {
id event.TargetID id event.TargetID
args NATSArgs args NATSArgs
natsConn *nats.Conn natsConn *nats.Conn
stanConn stan.Conn stanConn stan.Conn
store Store
} }
// ID - returns target ID. // ID - returns target ID.
@ -81,11 +127,24 @@ func (target *NATSTarget) ID() event.TargetID {
return target.id return target.id
} }
// Save - Sends event directly without persisting. // Save - saves the events to the store which will be replayed when the Nats connection is active.
func (target *NATSTarget) Save(eventData event.Event) error { func (target *NATSTarget) Save(eventData event.Event) error {
if target.store != nil {
return target.store.Put(eventData)
}
if target.args.Streaming.Enable {
if !target.stanConn.NatsConn().IsConnected() {
return errNotConnected
}
} else {
if !target.natsConn.IsConnected() {
return errNotConnected
}
}
return target.send(eventData) return target.send(eventData)
} }
// send - sends an event to the Nats.
func (target *NATSTarget) send(eventData event.Event) error { func (target *NATSTarget) send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key) objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil { if err != nil {
@ -107,18 +166,62 @@ func (target *NATSTarget) send(eventData event.Event) error {
} else { } else {
err = target.natsConn.Publish(target.args.Subject, data) err = target.natsConn.Publish(target.args.Subject, data)
} }
return err return err
} }
// Send - interface compatible method does no-op. // Send - sends event to Nats.
func (target *NATSTarget) Send(eventKey string) error { func (target *NATSTarget) Send(eventKey string) error {
return nil var connErr error
if target.args.Streaming.Enable {
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
target.stanConn, connErr = target.args.connectStan()
} else {
if !target.stanConn.NatsConn().IsConnected() {
return errNotConnected
}
}
} else {
if target.natsConn == nil {
target.natsConn, connErr = target.args.connectNats()
} else {
if !target.natsConn.IsConnected() {
return errNotConnected
}
}
}
if connErr != nil {
if connErr.Error() == nats.ErrNoServers.Error() {
return errNotConnected
}
return connErr
}
eventData, eErr := target.store.Get(eventKey)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData); err != nil {
return err
}
return target.store.Del(eventKey)
} }
// Close - closes underneath connections to NATS server. // Close - closes underneath connections to NATS server.
func (target *NATSTarget) Close() (err error) { func (target *NATSTarget) Close() (err error) {
if target.stanConn != nil { if target.stanConn != nil {
// closing the streaming connection does not close the provided NATS connection.
if target.stanConn.NatsConn() != nil {
target.stanConn.NatsConn().Close()
}
err = target.stanConn.Close() err = target.stanConn.Close()
} }
@ -130,47 +233,48 @@ func (target *NATSTarget) Close() (err error) {
} }
// NewNATSTarget - creates new NATS target. // NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs) (*NATSTarget, error) { func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}) (*NATSTarget, error) {
var natsConn *nats.Conn var natsConn *nats.Conn
var stanConn stan.Conn var stanConn stan.Conn
var clientID string
var err error
if args.Streaming.Enable { var err error
scheme := "nats"
if args.Secure {
scheme = "tls"
}
addressURL := scheme + "://" + args.Username + ":" + args.Password + "@" + args.Address.String()
clientID, err = getNewUUID() var store Store
if err != nil {
return nil, err
}
connOpts := []stan.Option{stan.NatsURL(addressURL)} if args.QueueDir != "" {
if args.Streaming.MaxPubAcksInflight > 0 { queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
connOpts = append(connOpts, stan.MaxPubAcksInflight(args.Streaming.MaxPubAcksInflight)) store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
} }
}
stanConn, err = stan.Connect(args.Streaming.ClusterID, clientID, connOpts...) if args.Streaming.Enable {
stanConn, err = args.connectStan()
} else { } else {
options := nats.DefaultOptions natsConn, err = args.connectNats()
options.Url = "nats://" + args.Address.String()
options.User = args.Username
options.Password = args.Password
options.Token = args.Token
options.Secure = args.Secure
natsConn, err = options.Connect()
} }
if err != nil { if err != nil {
return nil, err if store == nil || err.Error() != nats.ErrNoServers.Error() {
return nil, err
}
} }
return &NATSTarget{ target := &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"}, id: event.TargetID{ID: id, Name: "nats"},
args: args, args: args,
stanConn: stanConn, stanConn: stanConn,
natsConn: natsConn, natsConn: natsConn,
}, nil store: store,
}
if target.store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh)
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh)
}
return target, nil
} }

Loading…
Cancel
Save