From 55d4eee6f1cca3670c16fb609ec2856987aa7bf9 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Wed, 24 Jul 2019 22:48:29 +0530 Subject: [PATCH] Enable event persistence in MySQL and PostgreSQL (#7629) --- cmd/admin-handlers_test.go | 8 +- cmd/config-current.go | 8 +- cmd/config-current_test.go | 4 +- docs/bucket/notifications/README.md | 34 +++-- docs/config/config.sample.json | 8 +- pkg/event/target/mysql.go | 184 +++++++++++++++++++++------- pkg/event/target/postgresql.go | 173 ++++++++++++++++++++------ 7 files changed, 318 insertions(+), 101 deletions(-) diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 36ceb96f7..ff4356738 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -139,7 +139,9 @@ var ( "port": "", "user": "", "password": "", - "database": "" + "database": "", + "queueDir": "", + "queueLimit": 0 } }, "nats": { @@ -185,7 +187,9 @@ var ( "port": "", "user": "", "password": "", - "database": "" + "database": "", + "queueDir": "", + "queueLimit": 0 } }, "redis": { diff --git a/cmd/config-current.go b/cmd/config-current.go index 4c78ac420..e5e4af4e2 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -347,7 +347,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewMySQLTarget(k, v) + t, err := target.NewMySQLTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("mysql(%s): %s", k, err.Error()) } @@ -380,7 +380,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewPostgreSQLTarget(k, v) + t, err := target.NewPostgreSQLTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("postgreSQL(%s): %s", k, err.Error()) } @@ -696,7 +696,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.MySQL { if args.Enable { - newTarget, err := target.NewMySQLTarget(id, args) + newTarget, err := target.NewMySQLTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue @@ -738,7 +738,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.PostgreSQL { if args.Enable { - newTarget, err := target.NewPostgreSQLTarget(id, args) + newTarget, err := target.NewPostgreSQLTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index 59cfb2843..f7b528e07 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -197,7 +197,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "redis": { "1": { "enable": true, "address": "", "password": "", "key": "" } }}}`, false}, // Test 15 - Test PostgreSQL - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "postgresql": { "1": { "enable": true, "connectionString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "", "queueDir": "", "queueLimit": 0 }}}}`, false}, // Test 16 - Test Kafka {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "kafka": { "1": { "enable": true, "brokers": null, "topic": "", "queueDir": "", "queueLimit": 0 } }}}`, false}, @@ -206,7 +206,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "webhook": { "1": { "enable": true, "endpoint": "", "queueDir": "", "queueLimit": 0} }}}`, false}, // Test 18 - Test MySQL - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "" }}}}`, false}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "table": "", "host": "", "port": "", "user": "", "password": "", "database": "", "queueDir": "", "queueLimit": 0 }}}}`, false}, // Test 19 - Test Format for MySQL {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "mysql": { "1": { "enable": true, "dsnString": "", "format": "invalid", "table": "xxx", "host": "10.0.0.1", "port": "3306", "user": "abc", "password": "pqr", "database": "test1" }}}}`, false}, diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 4c3f123b7..db7e36839 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -789,11 +789,15 @@ An example of PostgreSQL configuration is as follows: "port": "5432", "user": "postgres", "password": "password", - "database": "minio_events" + "database": "minio_events", + "queueDir": "", + "queueLimit": 0 } } ``` +MinIO supports persistent event store. The persistent store will backup events when the PostgreSQL connection goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. + Note that for illustration here, we have disabled SSL. In the interest of security, for production this is not recommended. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. @@ -884,21 +888,27 @@ The MinIO server configuration file is stored on the backend in json format. The An example of MySQL configuration is as follows: -``` +```json "mysql": { - "1": { - "enable": true, - "dsnString": "", - "table": "minio_images", - "host": "172.17.0.1", - "port": "3306", - "user": "root", - "password": "password", - "database": "miniodb" - } + "1": { + "enable": true, + "dsnString": "", + "format": "namespace", + "table": "minio_images", + "host": "172.17.0.1", + "port": "3306", + "user": "root", + "password": "password", + "database": "miniodb", + "queueDir": "", + "queueLimit": 0 + } } ``` + +MinIO supports persistent event store. The persistent store will backup events when the MySQL connection goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. + To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. ```sh diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index 4f334f798..781c64ef9 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -104,7 +104,9 @@ "port": "", "user": "", "password": "", - "database": "" + "database": "", + "queueDir": "", + "queueLimit": 0 } }, "nats": { @@ -150,7 +152,9 @@ "port": "", "user": "", "password": "", - "database": "" + "database": "", + "queueDir": "", + "queueLimit": 0 } }, "redis": { diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index cbe01c235..ad5611c86 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -56,8 +56,11 @@ package target import ( "database/sql" "encoding/json" + "errors" "fmt" "net/url" + "os" + "path/filepath" "strconv" "strings" "time" @@ -79,15 +82,17 @@ const ( // MySQLArgs - MySQL target arguments. type MySQLArgs struct { - Enable bool `json:"enable"` - Format string `json:"format"` - DSN string `json:"dsnString"` - Table string `json:"table"` - Host xnet.URL `json:"host"` - Port string `json:"port"` - User string `json:"user"` - Password string `json:"password"` - Database string `json:"database"` + Enable bool `json:"enable"` + Format string `json:"format"` + DSN string `json:"dsnString"` + Table string `json:"table"` + Host xnet.URL `json:"host"` + Port string `json:"port"` + User string `json:"user"` + Password string `json:"password"` + Database string `json:"database"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` } // Validate MySQLArgs fields @@ -123,6 +128,16 @@ func (m MySQLArgs) Validate() error { return fmt.Errorf("database unspecified") } } + + if m.QueueDir != "" { + if !filepath.IsAbs(m.QueueDir) { + return errors.New("queueDir path should be absolute") + } + } + if m.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } + return nil } @@ -134,6 +149,8 @@ type MySQLTarget struct { deleteStmt *sql.Stmt insertStmt *sql.Stmt db *sql.DB + store Store + firstPing bool } // ID - returns target ID. @@ -141,11 +158,21 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } -// Save - Sends event directly without persisting. +// Save - saves the events to the store which will be replayed when the SQL connection is active. func (target *MySQLTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if err := target.db.Ping(); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } return target.send(eventData) } +// send - sends an event to the mysql. func (target *MySQLTarget) send(eventData event.Event) error { if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) @@ -164,6 +191,7 @@ func (target *MySQLTarget) send(eventData event.Event) error { _, err = target.updateStmt.Exec(key, data) } + return err } @@ -179,15 +207,51 @@ func (target *MySQLTarget) send(eventData event.Event) error { } _, err = target.insertStmt.Exec(eventTime, data) + return err } return nil } -// Send - interface compatible method does no-op. +// Send - reads an event from store and sends it to MySQL. func (target *MySQLTarget) Send(eventKey string) error { - return nil + + if err := target.db.Ping(); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } + + if !target.firstPing { + if err := target.executeStmts(); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } + + // Delete the event from store. + return target.store.Del(eventKey) } // Close - closes underneath connections to MySQL database. @@ -210,8 +274,45 @@ func (target *MySQLTarget) Close() error { return target.db.Close() } +// Executes the table creation statements. +func (target *MySQLTarget) executeStmts() error { + + _, err := target.db.Exec(fmt.Sprintf(mysqlTableExists, target.args.Table)) + if err != nil { + createStmt := mysqlCreateNamespaceTable + if target.args.Format == event.AccessFormat { + createStmt = mysqlCreateAccessTable + } + + if _, dbErr := target.db.Exec(fmt.Sprintf(createStmt, target.args.Table)); dbErr != nil { + return dbErr + } + } + + switch target.args.Format { + case event.NamespaceFormat: + // insert or update statement + if target.updateStmt, err = target.db.Prepare(fmt.Sprintf(mysqlUpdateRow, target.args.Table)); err != nil { + return err + } + // delete statement + if target.deleteStmt, err = target.db.Prepare(fmt.Sprintf(mysqlDeleteRow, target.args.Table)); err != nil { + return err + } + case event.AccessFormat: + // insert statement + if target.insertStmt, err = target.db.Prepare(fmt.Sprintf(mysqlInsertRow, target.args.Table)); err != nil { + return err + } + } + + return nil + +} + // NewMySQLTarget - creates new MySQL target. -func NewMySQLTarget(id string, args MySQLArgs) (*MySQLTarget, error) { +func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}) (*MySQLTarget, error) { + var firstPing bool if args.DSN == "" { config := mysql.Config{ User: args.User, @@ -230,45 +331,42 @@ func NewMySQLTarget(id string, args MySQLArgs) (*MySQLTarget, error) { return nil, err } - if err = db.Ping(); err != nil { - return nil, err - } + var store Store - if _, err = db.Exec(fmt.Sprintf(mysqlTableExists, args.Table)); err != nil { - createStmt := mysqlCreateNamespaceTable - if args.Format == event.AccessFormat { - createStmt = mysqlCreateAccessTable + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil, oErr } + } - if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil { - return nil, err - } + target := &MySQLTarget{ + id: event.TargetID{ID: id, Name: "mysql"}, + args: args, + db: db, + store: store, + firstPing: firstPing, } - var updateStmt, deleteStmt, insertStmt *sql.Stmt - switch args.Format { - case event.NamespaceFormat: - // insert or update statement - if updateStmt, err = db.Prepare(fmt.Sprintf(mysqlUpdateRow, args.Table)); err != nil { - return nil, err - } - // delete statement - if deleteStmt, err = db.Prepare(fmt.Sprintf(mysqlDeleteRow, args.Table)); err != nil { + err = target.db.Ping() + if err != nil { + if target.store == nil || !IsConnRefusedErr(err) { return nil, err } - case event.AccessFormat: - // insert statement - if insertStmt, err = db.Prepare(fmt.Sprintf(mysqlInsertRow, args.Table)); err != nil { + } else { + if err = target.executeStmts(); err != nil { return nil, err } + target.firstPing = true + } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) } - return &MySQLTarget{ - id: event.TargetID{ID: id, Name: "mysql"}, - args: args, - updateStmt: updateStmt, - deleteStmt: deleteStmt, - insertStmt: insertStmt, - db: db, - }, nil + return target, nil } diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index bea633de8..839a017fb 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -56,8 +56,11 @@ package target import ( "database/sql" "encoding/json" + "errors" "fmt" "net/url" + "os" + "path/filepath" "strconv" "strings" "time" @@ -89,6 +92,8 @@ type PostgreSQLArgs struct { User string `json:"user"` // default: user running minio Password string `json:"password"` // default: no password Database string `json:"database"` // default: same as user + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` } // Validate PostgreSQLArgs fields @@ -122,6 +127,15 @@ func (p PostgreSQLArgs) Validate() error { } } + if p.QueueDir != "" { + if !filepath.IsAbs(p.QueueDir) { + return errors.New("queueDir path should be absolute") + } + } + if p.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } + return nil } @@ -133,6 +147,8 @@ type PostgreSQLTarget struct { deleteStmt *sql.Stmt insertStmt *sql.Stmt db *sql.DB + store Store + firstPing bool } // ID - returns target ID. @@ -140,11 +156,26 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } -// Save - Sends event directly without persisting. +// Save - saves the events to the store if questore is configured, which will be replayed when the PostgreSQL connection is active. func (target *PostgreSQLTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if err := target.db.Ping(); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } return target.send(eventData) } +// IsConnErr - To detect a connection error. +func IsConnErr(err error) bool { + return IsConnRefusedErr(err) || err.Error() == "sql: database is closed" || err.Error() == "sql: statement is closed" || err.Error() == "invalid connection" +} + +// send - sends an event to the PostgreSQL. func (target *PostgreSQLTarget) send(eventData event.Event) error { if target.args.Format == event.NamespaceFormat { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) @@ -177,16 +208,52 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error { return err } - _, err = target.insertStmt.Exec(eventTime, data) - return err + if _, err = target.insertStmt.Exec(eventTime, data); err != nil { + return err + } } return nil } -// Send - interface compatible method does no-op. +// Send - reads an event from store and sends it to PostgreSQL. func (target *PostgreSQLTarget) Send(eventKey string) error { - return nil + + if err := target.db.Ping(); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } + + if !target.firstPing { + if err := target.executeStmts(); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData); err != nil { + if IsConnErr(err) { + return errNotConnected + } + return err + } + + // Delete the event from store. + return target.store.Del(eventKey) } // Close - closes underneath connections to PostgreSQL database. @@ -209,8 +276,45 @@ func (target *PostgreSQLTarget) Close() error { return target.db.Close() } +// Executes the table creation statements. +func (target *PostgreSQLTarget) executeStmts() error { + + _, err := target.db.Exec(fmt.Sprintf(psqlTableExists, target.args.Table)) + if err != nil { + createStmt := psqlCreateNamespaceTable + if target.args.Format == event.AccessFormat { + createStmt = psqlCreateAccessTable + } + + if _, dbErr := target.db.Exec(fmt.Sprintf(createStmt, target.args.Table)); dbErr != nil { + return dbErr + } + } + + switch target.args.Format { + case event.NamespaceFormat: + // insert or update statement + if target.updateStmt, err = target.db.Prepare(fmt.Sprintf(psqlUpdateRow, target.args.Table)); err != nil { + return err + } + // delete statement + if target.deleteStmt, err = target.db.Prepare(fmt.Sprintf(psqlDeleteRow, target.args.Table)); err != nil { + return err + } + case event.AccessFormat: + // insert statement + if target.insertStmt, err = target.db.Prepare(fmt.Sprintf(psqlInsertRow, target.args.Table)); err != nil { + return err + } + } + + return nil +} + // NewPostgreSQLTarget - creates new PostgreSQL target. -func NewPostgreSQLTarget(id string, args PostgreSQLArgs) (*PostgreSQLTarget, error) { +func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}) (*PostgreSQLTarget, error) { + var firstPing bool + params := []string{args.ConnectionString} if !args.Host.IsEmpty() { params = append(params, "host="+args.Host.String()) @@ -234,45 +338,42 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs) (*PostgreSQLTarget, err return nil, err } - if err = db.Ping(); err != nil { - return nil, err - } + var store Store - if _, err = db.Exec(fmt.Sprintf(psqlTableExists, args.Table)); err != nil { - createStmt := psqlCreateNamespaceTable - if args.Format == event.AccessFormat { - createStmt = psqlCreateAccessTable + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil, oErr } + } - if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil { - return nil, err - } + target := &PostgreSQLTarget{ + id: event.TargetID{ID: id, Name: "postgresql"}, + args: args, + db: db, + store: store, + firstPing: firstPing, } - var updateStmt, deleteStmt, insertStmt *sql.Stmt - switch args.Format { - case event.NamespaceFormat: - // insert or update statement - if updateStmt, err = db.Prepare(fmt.Sprintf(psqlUpdateRow, args.Table)); err != nil { - return nil, err - } - // delete statement - if deleteStmt, err = db.Prepare(fmt.Sprintf(psqlDeleteRow, args.Table)); err != nil { + err = target.db.Ping() + if err != nil { + if target.store == nil || !IsConnRefusedErr(err) { return nil, err } - case event.AccessFormat: - // insert statement - if insertStmt, err = db.Prepare(fmt.Sprintf(psqlInsertRow, args.Table)); err != nil { + } else { + if err = target.executeStmts(); err != nil { return nil, err } + target.firstPing = true + } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) } - return &PostgreSQLTarget{ - id: event.TargetID{ID: id, Name: "postgresql"}, - args: args, - updateStmt: updateStmt, - deleteStmt: deleteStmt, - insertStmt: insertStmt, - db: db, - }, nil + return target, nil }