You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
minio/cmd/notify-mysql.go

338 lines
8.7 KiB

/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// MySQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in MySQL. On each
// object removal, the corresponding row is deleted from the table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a command with the required structure is:
//
// CREATE TABLE myminio (
// key_name VARCHAR(2048),
// value JSONB,
// PRIMARY KEY (key_name),
// );
//
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
// here. The implementation has been tested with MySQL Ver 14.14
// Distrib 5.7.17.
//
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
// event_data JSONB
// );
package cmd
import (
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
"github.com/go-sql-driver/mysql"
)
const (
// Queries for format=namespace mode.
upsertRowForNSMySQL = `INSERT INTO %s (key_name, value)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE value=VALUES(value);
`
deleteRowForNSMySQL = ` DELETE FROM %s
WHERE key_name = ?;`
createTableForNSMySQL = `CREATE TABLE %s (
key_name VARCHAR(2048),
value JSON,
PRIMARY KEY (key_name)
);`
// Queries for format=access mode.
insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data)
VALUES (?, ?);`
createTableForAccessMySQL = `CREATE TABLE %s (
event_time DATETIME NOT NULL,
event_data JSON
);`
// Query to check if a table already exists.
tableExistsMySQL = `SELECT 1 FROM %s;`
)
var (
mysqlErrFunc = newNotificationErrorFactory("MySQL")
errMysqlFormat = mysqlErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
errMysqlTable = mysqlErrFunc("Table was not specified in the configuration.")
)
type mySQLNotify struct {
Enable bool `json:"enable"`
Format string `json:"format"`
// pass data-source-name connection string in config
// directly. This string is formatted according to
// https://github.com/go-sql-driver/mysql#dsn-data-source-name
DsnString string `json:"dsnString"`
// specifying a table name is required.
Table string `json:"table"`
// uses the values below if no connection string is specified
// - however the connection string method offers more
// flexibility.
Host string `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
}
func (m *mySQLNotify) Validate() error {
if !m.Enable {
return nil
}
if m.Format != formatNamespace && m.Format != formatAccess {
return errMysqlFormat
}
if m.DsnString == "" {
if _, err := checkURL(m.Host); err != nil {
return err
}
}
if m.Table == "" {
return errMysqlTable
}
return nil
}
type mySQLConn struct {
dsnStr string
table string
format string
preparedStmts map[string]*sql.Stmt
*sql.DB
}
func dialMySQL(msql mySQLNotify) (mc mySQLConn, e error) {
if !msql.Enable {
return mc, errNotifyNotEnabled
}
dsnStr := msql.DsnString
// check if connection string is specified
if dsnStr == "" {
// build from other parameters
config := mysql.Config{
User: msql.User,
Passwd: msql.Password,
Net: "tcp",
Addr: msql.Host + ":" + msql.Port,
DBName: msql.Database,
}
dsnStr = config.FormatDSN()
}
db, err := sql.Open("mysql", dsnStr)
if err != nil {
return mc, mysqlErrFunc(
"Connection opening failure (dsnStr=%s): %v",
dsnStr, err)
}
// ping to check that server is actually reachable.
err = db.Ping()
if err != nil {
return mc, mysqlErrFunc(
"Ping to server failed with: %v", err)
}
// check that table exists - if not, create it.
_, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table))
if err != nil {
createStmt := createTableForNSMySQL
if msql.Format == formatAccess {
createStmt = createTableForAccessMySQL
}
// most likely, table does not exist. try to create it:
_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
if errCreate != nil {
// failed to create the table. error out.
return mc, mysqlErrFunc(
"'Select' failed with %v, then 'Create Table' failed with %v",
err, errCreate,
)
}
}
// create prepared statements
stmts := make(map[string]*sql.Stmt)
switch msql.Format {
case formatNamespace:
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL,
msql.Table))
if err != nil {
return mc, mysqlErrFunc("create UPSERT prepared statement failed with: %v", err)
}
// delete statement
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
msql.Table))
if err != nil {
return mc, mysqlErrFunc("create DELETE prepared statement failed with: %v", err)
}
case formatAccess:
// insert statement
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
msql.Table))
if err != nil {
return mc, mysqlErrFunc(
"create INSERT prepared statement failed with: %v", err)
}
}
return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil
}
func newMySQLNotify(accountID string) (*logrus.Logger, error) {
mysqlNotify := serverConfig.Notify.GetMySQLByID(accountID)
// Dial mysql
myC, err := dialMySQL(mysqlNotify)
if err != nil {
return nil, err
}
mySQLLog := logrus.New()
mySQLLog.Out = ioutil.Discard
mySQLLog.Formatter = new(logrus.JSONFormatter)
mySQLLog.Hooks.Add(myC)
return mySQLLog, nil
}
func (myC mySQLConn) Close() {
// first close all prepared statements
for _, v := range myC.preparedStmts {
_ = v.Close()
}
// close db connection
_ = myC.DB.Close()
}
func (myC mySQLConn) Fire(entry *logrus.Entry) error {
// get event type by trying to convert to string
entryEventType, ok := entry.Data["EventType"].(string)
if !ok {
// ignore event if converting EventType to string
// fails.
return nil
}
jsonEncoder := func(d interface{}) ([]byte, error) {
value, err := json.Marshal(map[string]interface{}{
"Records": d,
})
if err != nil {
return nil, mysqlErrFunc(
"Unable to encode event %v to JSON: %v", d, err)
}
return value, nil
}
switch myC.format {
case formatNamespace:
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return mysqlErrFunc(
"Error deleting event with key = %v - got mysql error - %v",
entry.Data["Key"], err,
)
}
} else {
value, err := jsonEncoder(entry.Data["Records"])
if err != nil {
return err
}
// upsert row into the table
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
if err != nil {
return mysqlErrFunc(
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
entry.Data["Key"], entry.Data["Records"], err,
)
}
}
case formatAccess:
// eventTime is taken from the first entry in the
// records.
events, ok := entry.Data["Records"].([]NotificationEvent)
if !ok {
return mysqlErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
}
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
if err != nil {
return mysqlErrFunc("unable to parse event time \"%s\": %v",
events[0].EventTime, err)
}
value, err := jsonEncodeEventData(entry.Data["Records"])
if err != nil {
return err
}
_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
if err != nil {
return mysqlErrFunc("Unable to insert event with value=%v: %v",
value, err)
}
}
return nil
}
func (myC mySQLConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}