|
|
|
/*
|
|
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
|
|
*
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package cmd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
Add `access` format support for Elasticsearch notification target (#4006)
This change adds `access` format support for notifications to a
Elasticsearch server, and it refactors `namespace` format support.
In the case of `access` format, for each event in Minio, a JSON
document is inserted into Elasticsearch with its timestamp set to the
event's timestamp, and with the ID generated automatically by
elasticsearch. No events are modified or deleted in this mode.
In the case of `namespace` format, for each event in Minio, a JSON
document is keyed together by the bucket and object name is updated in
Elasticsearch. In the case of an object being created or over-written
in Minio, a new document or an existing document is inserted into the
Elasticsearch index. If an object is deleted in Minio, the
corresponding document is deleted from the Elasticsearch index.
Additionally, this change upgrades Elasticsearch support to the 5.x
series. This is a breaking change, and users of previous elasticsearch
versions should upgrade.
Also updates documentation on Elasticsearch notification target usage
and has a link to an elasticsearch upgrade guide.
This is the last patch that finally resolves #3928.
8 years ago
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"github.com/minio/minio/pkg/wildcard"
|
|
|
|
)
|
|
|
|
|
|
|
|
// SQS type.
|
|
|
|
const (
|
|
|
|
// Minio sqs ARN prefix.
|
|
|
|
minioSqs = "arn:minio:sqs:"
|
|
|
|
|
|
|
|
// Static string indicating queue type 'amqp'.
|
|
|
|
queueTypeAMQP = "amqp"
|
|
|
|
// Static string indicating queue type 'mqtt'.
|
|
|
|
queueTypeMQTT = "mqtt"
|
|
|
|
// Static string indicating queue type 'nats'.
|
|
|
|
queueTypeNATS = "nats"
|
|
|
|
// Static string indicating queue type 'elasticsearch'.
|
|
|
|
queueTypeElastic = "elasticsearch"
|
|
|
|
// Static string indicating queue type 'redis'.
|
|
|
|
queueTypeRedis = "redis"
|
|
|
|
// Static string indicating queue type 'postgresql'.
|
|
|
|
queueTypePostgreSQL = "postgresql"
|
|
|
|
// Static string indicating queue type 'mysql'.
|
|
|
|
queueTypeMySQL = "mysql"
|
|
|
|
// Static string indicating queue type 'kafka'.
|
|
|
|
queueTypeKafka = "kafka"
|
|
|
|
// Static string for Webhooks
|
|
|
|
queueTypeWebhook = "webhook"
|
|
|
|
|
|
|
|
// Notifier format value constants
|
|
|
|
formatNamespace = "namespace"
|
|
|
|
formatAccess = "access"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Topic type.
|
|
|
|
const (
|
|
|
|
// Minio topic ARN prefix.
|
|
|
|
minioTopic = "arn:minio:sns:"
|
|
|
|
|
|
|
|
// Static string indicating sns type 'listen'.
|
|
|
|
snsTypeMinio = "listen"
|
|
|
|
)
|
|
|
|
|
|
|
|
var errNotifyNotEnabled = errors.New("requested notifier not enabled")
|
|
|
|
|
|
|
|
// Returns true if queueArn is for an AMQP queue.
|
|
|
|
func isAMQPQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeAMQP {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
amqpL := serverConfig.Notify.GetAMQPByID(sqsArn.AccountID)
|
|
|
|
if !amqpL.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// Connect to amqp server to validate.
|
|
|
|
amqpC, err := dialAMQP(amqpL)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to amqp service. %#v", amqpL)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer amqpC.Close()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if mqttARN is for an MQTT queue.
|
|
|
|
func isMQTTQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeMQTT {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
mqttL := serverConfig.Notify.GetMQTTByID(sqsArn.AccountID)
|
|
|
|
if !mqttL.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// Connect to mqtt server to validate.
|
|
|
|
mqttC, err := dialMQTT(mqttL)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to mqtt service. %#v", mqttL)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer mqttC.Client.Disconnect(250)
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if natsArn is for an NATS queue.
|
|
|
|
func isNATSQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeNATS {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
natsL := serverConfig.Notify.GetNATSByID(sqsArn.AccountID)
|
|
|
|
if !natsL.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// Connect to nats server to validate.
|
|
|
|
natsC, err := dialNATS(natsL, true)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to nats service. %#v", natsL)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
closeNATS(natsC)
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if queueArn is for an Webhook queue
|
|
|
|
func isWebhookQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeWebhook {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
rNotify := serverConfig.Notify.GetWebhookByID(sqsArn.AccountID)
|
|
|
|
return rNotify.Enable
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if queueArn is for an Redis queue.
|
|
|
|
func isRedisQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeRedis {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
rNotify := serverConfig.Notify.GetRedisByID(sqsArn.AccountID)
|
|
|
|
if !rNotify.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// Connect to redis server to validate.
|
|
|
|
rPool, err := dialRedis(rNotify)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to redis service. %#v", rNotify)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer rPool.Close()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if queueArn is for an ElasticSearch queue.
|
|
|
|
func isElasticQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeElastic {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
esNotify := serverConfig.Notify.GetElasticSearchByID(sqsArn.AccountID)
|
|
|
|
if !esNotify.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
elasticC, err := dialElastic(esNotify)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to elasticsearch service %#v", esNotify)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer elasticC.Stop()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if queueArn is for PostgreSQL.
|
|
|
|
func isPostgreSQLQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypePostgreSQL {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
pgNotify := serverConfig.Notify.GetPostgreSQLByID(sqsArn.AccountID)
|
|
|
|
if !pgNotify.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
pgC, err := dialPostgreSQL(pgNotify)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to PostgreSQL server %#v", pgNotify)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer pgC.Close()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if queueArn is for MySQL.
|
|
|
|
func isMySQLQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeMySQL {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
msqlNotify := serverConfig.Notify.GetMySQLByID(sqsArn.AccountID)
|
|
|
|
if !msqlNotify.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
myC, err := dialMySQL(msqlNotify)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to connect to MySQL server %#v", msqlNotify)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer myC.Close()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Returns true if queueArn is for Kafka.
|
|
|
|
func isKafkaQueue(sqsArn arnSQS) bool {
|
|
|
|
if sqsArn.Type != queueTypeKafka {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
kafkaNotifyCfg := serverConfig.Notify.GetKafkaByID(sqsArn.AccountID)
|
|
|
|
if !kafkaNotifyCfg.Enable {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
kafkaC, err := dialKafka(kafkaNotifyCfg)
|
|
|
|
if err != nil {
|
|
|
|
errorIf(err, "Unable to dial Kafka server %#v", kafkaNotifyCfg)
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
defer kafkaC.Close()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// Match function matches wild cards in 'pattern' for events.
|
|
|
|
func eventMatch(eventType string, events []string) (ok bool) {
|
|
|
|
for _, event := range events {
|
|
|
|
ok = wildcard.MatchSimple(event, eventType)
|
|
|
|
if ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ok
|
|
|
|
}
|
|
|
|
|
|
|
|
// Filter rule match, matches an object against the filter rules.
|
|
|
|
func filterRuleMatch(object string, frs []filterRule) bool {
|
|
|
|
var prefixMatch, suffixMatch = true, true
|
|
|
|
for _, fr := range frs {
|
|
|
|
if isValidFilterNamePrefix(fr.Name) {
|
|
|
|
prefixMatch = hasPrefix(object, fr.Value)
|
|
|
|
} else if isValidFilterNameSuffix(fr.Name) {
|
|
|
|
suffixMatch = hasSuffix(object, fr.Value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return prefixMatch && suffixMatch
|
|
|
|
}
|
Add `access` format support for Elasticsearch notification target (#4006)
This change adds `access` format support for notifications to a
Elasticsearch server, and it refactors `namespace` format support.
In the case of `access` format, for each event in Minio, a JSON
document is inserted into Elasticsearch with its timestamp set to the
event's timestamp, and with the ID generated automatically by
elasticsearch. No events are modified or deleted in this mode.
In the case of `namespace` format, for each event in Minio, a JSON
document is keyed together by the bucket and object name is updated in
Elasticsearch. In the case of an object being created or over-written
in Minio, a new document or an existing document is inserted into the
Elasticsearch index. If an object is deleted in Minio, the
corresponding document is deleted from the Elasticsearch index.
Additionally, this change upgrades Elasticsearch support to the 5.x
series. This is a breaking change, and users of previous elasticsearch
versions should upgrade.
Also updates documentation on Elasticsearch notification target usage
and has a link to an elasticsearch upgrade guide.
This is the last patch that finally resolves #3928.
8 years ago
|
|
|
|
|
|
|
// A type to represent dynamic error generation functions for
|
|
|
|
// notifications.
|
|
|
|
type notificationErrorFactoryFunc func(string, ...interface{}) error
|
|
|
|
|
|
|
|
// A function to build dynamic error generation functions for
|
|
|
|
// notifications by setting an error prefix string.
|
|
|
|
func newNotificationErrorFactory(prefix string) notificationErrorFactoryFunc {
|
|
|
|
return func(msg string, a ...interface{}) error {
|
|
|
|
s := fmt.Sprintf(msg, a...)
|
|
|
|
return fmt.Errorf("%s: %s", prefix, s)
|
|
|
|
}
|
|
|
|
}
|