minio/cmd/notify-mysql.go
Aditya Manthramurthy a2a8d54bb6 Add access format support for Elasticsearch notification target (#4006)
This change adds `access` format support for notifications to a
Elasticsearch server, and it refactors `namespace` format support.

In the case of `access` format, for each event in Minio, a JSON
document is inserted into Elasticsearch with its timestamp set to the
event's timestamp, and with the ID generated automatically by
elasticsearch. No events are modified or deleted in this mode.

In the case of `namespace` format, for each event in Minio, a JSON
document is keyed together by the bucket and object name is updated in
Elasticsearch. In the case of an object being created or over-written
in Minio, a new document or an existing document is inserted into the
Elasticsearch index. If an object is deleted in Minio, the
corresponding document is deleted from the Elasticsearch index.

Additionally, this change upgrades Elasticsearch support to the 5.x
series. This is a breaking change, and users of previous elasticsearch
versions should upgrade.

Also updates documentation on Elasticsearch notification target usage
and has a link to an elasticsearch upgrade guide.

This is the last patch that finally resolves #3928.
2017-03-31 14:11:27 -07:00

340 lines
8.8 KiB
Go

/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// MySQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in MySQL. On each
// object removal, the corresponding row is deleted from the table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a command with the required structure is:
//
// CREATE TABLE myminio (
// key_name VARCHAR(2048),
// value JSONB,
// PRIMARY KEY (key_name),
// );
//
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
// here. The implementation has been tested with MySQL Ver 14.14
// Distrib 5.7.17.
//
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
// event_data JSONB
// );
package cmd
import (
"database/sql"
"encoding/json"
"fmt"
"io/ioutil"
"time"
"github.com/Sirupsen/logrus"
"github.com/go-sql-driver/mysql"
)
const (
// Queries for format=namespace mode.
upsertRowForNSMySQL = `INSERT INTO %s (key_name, value)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE value=VALUES(value);
`
deleteRowForNSMySQL = ` DELETE FROM %s
WHERE key_name = ?;`
createTableForNSMySQL = `CREATE TABLE %s (
key_name VARCHAR(2048),
value JSON,
PRIMARY KEY (key_name)
);`
// Queries for format=access mode.
insertRowForAccessMySQL = `INSERT INTO %s (event_time, event_data)
VALUES (?, ?);`
createTableForAccessMySQL = `CREATE TABLE %s (
event_time DATETIME NOT NULL,
event_data JSON
);`
// Query to check if a table already exists.
tableExistsMySQL = `SELECT 1 FROM %s;`
)
var (
mysqlErrFunc = newNotificationErrorFactory("MySQL")
errMysqlFormat = mysqlErrFunc(`"format" value is invalid - it must be one of "%s" or "%s".`, formatNamespace, formatAccess)
errMysqlTable = mysqlErrFunc("Table was not specified in the configuration.")
)
type mySQLNotify struct {
Enable bool `json:"enable"`
Format string `json:"format"`
// pass data-source-name connection string in config
// directly. This string is formatted according to
// https://github.com/go-sql-driver/mysql#dsn-data-source-name
DsnString string `json:"dsnString"`
// specifying a table name is required.
Table string `json:"table"`
// uses the values below if no connection string is specified
// - however the connection string method offers more
// flexibility.
Host string `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
}
func (m *mySQLNotify) Validate() error {
if !m.Enable {
return nil
}
if m.Format != formatNamespace && m.Format != formatAccess {
return errMysqlFormat
}
if m.DsnString == "" {
if _, err := checkURL(m.Host); err != nil {
return err
}
}
if m.Table == "" {
return errMysqlTable
}
return nil
}
type mySQLConn struct {
dsnStr string
table string
format string
preparedStmts map[string]*sql.Stmt
*sql.DB
}
func dialMySQL(msql mySQLNotify) (mySQLConn, error) {
if !msql.Enable {
return mySQLConn{}, errNotifyNotEnabled
}
dsnStr := msql.DsnString
// check if connection string is specified
if dsnStr == "" {
// build from other parameters
config := mysql.Config{
User: msql.User,
Passwd: msql.Password,
Net: "tcp",
Addr: msql.Host + ":" + msql.Port,
DBName: msql.Database,
}
dsnStr = config.FormatDSN()
}
db, err := sql.Open("mysql", dsnStr)
if err != nil {
return mySQLConn{}, mysqlErrFunc(
"Connection opening failure (dsnStr=%s): %v",
dsnStr, err)
}
// ping to check that server is actually reachable.
err = db.Ping()
if err != nil {
return mySQLConn{}, mysqlErrFunc(
"Ping to server failed with: %v", err)
}
// check that table exists - if not, create it.
_, err = db.Exec(fmt.Sprintf(tableExistsMySQL, msql.Table))
if err != nil {
createStmt := createTableForNSMySQL
if msql.Format == formatAccess {
createStmt = createTableForAccessMySQL
}
// most likely, table does not exist. try to create it:
_, errCreate := db.Exec(fmt.Sprintf(createStmt, msql.Table))
if errCreate != nil {
// failed to create the table. error out.
return mySQLConn{}, mysqlErrFunc(
"'Select' failed with %v, then 'Create Table' failed with %v",
err, errCreate,
)
}
}
// create prepared statements
stmts := make(map[string]*sql.Stmt)
switch msql.Format {
case formatNamespace:
// insert or update statement
stmts["upsertRow"], err = db.Prepare(fmt.Sprintf(upsertRowForNSMySQL,
msql.Table))
if err != nil {
return mySQLConn{},
mysqlErrFunc("create UPSERT prepared statement failed with: %v", err)
}
// delete statement
stmts["deleteRow"], err = db.Prepare(fmt.Sprintf(deleteRowForNSMySQL,
msql.Table))
if err != nil {
return mySQLConn{},
mysqlErrFunc("create DELETE prepared statement failed with: %v", err)
}
case formatAccess:
// insert statement
stmts["insertRow"], err = db.Prepare(fmt.Sprintf(insertRowForAccessMySQL,
msql.Table))
if err != nil {
return mySQLConn{}, mysqlErrFunc(
"create INSERT prepared statement failed with: %v", err)
}
}
return mySQLConn{dsnStr, msql.Table, msql.Format, stmts, db}, nil
}
func newMySQLNotify(accountID string) (*logrus.Logger, error) {
mysqlNotify := serverConfig.Notify.GetMySQLByID(accountID)
// Dial mysql
myC, err := dialMySQL(mysqlNotify)
if err != nil {
return nil, err
}
mySQLLog := logrus.New()
mySQLLog.Out = ioutil.Discard
mySQLLog.Formatter = new(logrus.JSONFormatter)
mySQLLog.Hooks.Add(myC)
return mySQLLog, nil
}
func (myC mySQLConn) Close() {
// first close all prepared statements
for _, v := range myC.preparedStmts {
_ = v.Close()
}
// close db connection
_ = myC.DB.Close()
}
func (myC mySQLConn) Fire(entry *logrus.Entry) error {
// get event type by trying to convert to string
entryEventType, ok := entry.Data["EventType"].(string)
if !ok {
// ignore event if converting EventType to string
// fails.
return nil
}
jsonEncoder := func(d interface{}) ([]byte, error) {
value, err := json.Marshal(map[string]interface{}{
"Records": d,
})
if err != nil {
return nil, mysqlErrFunc(
"Unable to encode event %v to JSON: %v", d, err)
}
return value, nil
}
switch myC.format {
case formatNamespace:
// Check for event delete
if eventMatch(entryEventType, []string{"s3:ObjectRemoved:*"}) {
// delete row from the table
_, err := myC.preparedStmts["deleteRow"].Exec(entry.Data["Key"])
if err != nil {
return mysqlErrFunc(
"Error deleting event with key = %v - got mysql error - %v",
entry.Data["Key"], err,
)
}
} else {
value, err := jsonEncoder(entry.Data["Records"])
if err != nil {
return err
}
// upsert row into the table
_, err = myC.preparedStmts["upsertRow"].Exec(entry.Data["Key"], value)
if err != nil {
return mysqlErrFunc(
"Unable to upsert event with Key=%v and Value=%v - got mysql error - %v",
entry.Data["Key"], entry.Data["Records"], err,
)
}
}
case formatAccess:
// eventTime is taken from the first entry in the
// records.
events, ok := entry.Data["Records"].([]NotificationEvent)
if !ok {
return mysqlErrFunc("unable to extract event time due to conversion error of entry.Data[\"Records\"]=%v", entry.Data["Records"])
}
eventTime, err := time.Parse(timeFormatAMZ, events[0].EventTime)
if err != nil {
return mysqlErrFunc("unable to parse event time \"%s\": %v",
events[0].EventTime, err)
}
value, err := jsonEncodeEventData(entry.Data["Records"])
if err != nil {
return err
}
_, err = myC.preparedStmts["insertRow"].Exec(eventTime, value)
if err != nil {
return mysqlErrFunc("Unable to insert event with value=%v: %v",
value, err)
}
}
return nil
}
func (myC mySQLConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}