You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
404 lines
8.5 KiB
404 lines
8.5 KiB
/*
|
|
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
// Notifier represents collection of supported notification queues.
|
|
type notifier struct {
|
|
sync.RWMutex
|
|
AMQP amqpConfigs `json:"amqp"`
|
|
NATS natsConfigs `json:"nats"`
|
|
ElasticSearch elasticSearchConfigs `json:"elasticsearch"`
|
|
Redis redisConfigs `json:"redis"`
|
|
PostgreSQL postgreSQLConfigs `json:"postgresql"`
|
|
Kafka kafkaConfigs `json:"kafka"`
|
|
Webhook webhookConfigs `json:"webhook"`
|
|
MySQL mySQLConfigs `json:"mysql"`
|
|
MQTT mqttConfigs `json:"mqtt"`
|
|
// Add new notification queues.
|
|
}
|
|
|
|
type amqpConfigs map[string]amqpNotify
|
|
|
|
func (a amqpConfigs) Clone() amqpConfigs {
|
|
a2 := make(amqpConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a amqpConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("AMQP [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type mqttConfigs map[string]mqttNotify
|
|
|
|
func (a mqttConfigs) Clone() mqttConfigs {
|
|
a2 := make(mqttConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a mqttConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("MQTT [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type natsConfigs map[string]natsNotify
|
|
|
|
func (a natsConfigs) Clone() natsConfigs {
|
|
a2 := make(natsConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a natsConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("NATS [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type elasticSearchConfigs map[string]elasticSearchNotify
|
|
|
|
func (a elasticSearchConfigs) Clone() elasticSearchConfigs {
|
|
a2 := make(elasticSearchConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a elasticSearchConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("ElasticSearch [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type redisConfigs map[string]redisNotify
|
|
|
|
func (a redisConfigs) Clone() redisConfigs {
|
|
a2 := make(redisConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a redisConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("Redis [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type postgreSQLConfigs map[string]postgreSQLNotify
|
|
|
|
func (a postgreSQLConfigs) Clone() postgreSQLConfigs {
|
|
a2 := make(postgreSQLConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a postgreSQLConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("PostgreSQL [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type kafkaConfigs map[string]kafkaNotify
|
|
|
|
func (a kafkaConfigs) Clone() kafkaConfigs {
|
|
a2 := make(kafkaConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a kafkaConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("Kafka [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type webhookConfigs map[string]webhookNotify
|
|
|
|
func (a webhookConfigs) Clone() webhookConfigs {
|
|
a2 := make(webhookConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a webhookConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("Webhook [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type mySQLConfigs map[string]mySQLNotify
|
|
|
|
func (a mySQLConfigs) Clone() mySQLConfigs {
|
|
a2 := make(mySQLConfigs, len(a))
|
|
for k, v := range a {
|
|
a2[k] = v
|
|
}
|
|
return a2
|
|
}
|
|
|
|
func (a mySQLConfigs) Validate() error {
|
|
for k, v := range a {
|
|
if err := v.Validate(); err != nil {
|
|
return fmt.Errorf("MySQL [%s] configuration invalid: %s", k, err.Error())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *notifier) Validate() error {
|
|
if n == nil {
|
|
return nil
|
|
}
|
|
if err := n.AMQP.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.NATS.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.ElasticSearch.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.Redis.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.PostgreSQL.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.Kafka.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.Webhook.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.MySQL.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := n.MQTT.Validate(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *notifier) SetAMQPByID(accountID string, amqpn amqpNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.AMQP[accountID] = amqpn
|
|
}
|
|
|
|
func (n *notifier) GetAMQP() map[string]amqpNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.AMQP.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetAMQPByID(accountID string) amqpNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.AMQP[accountID]
|
|
}
|
|
|
|
func (n *notifier) SetMQTTByID(accountID string, mqttn mqttNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.MQTT[accountID] = mqttn
|
|
}
|
|
|
|
func (n *notifier) GetMQTT() map[string]mqttNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.MQTT.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetMQTTByID(accountID string) mqttNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.MQTT[accountID]
|
|
}
|
|
|
|
func (n *notifier) SetNATSByID(accountID string, natsn natsNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.NATS[accountID] = natsn
|
|
}
|
|
|
|
func (n *notifier) GetNATS() map[string]natsNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.NATS.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetNATSByID(accountID string) natsNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.NATS[accountID]
|
|
}
|
|
|
|
func (n *notifier) SetElasticSearchByID(accountID string, es elasticSearchNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.ElasticSearch[accountID] = es
|
|
}
|
|
|
|
func (n *notifier) GetElasticSearchByID(accountID string) elasticSearchNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.ElasticSearch[accountID]
|
|
}
|
|
|
|
func (n *notifier) GetElasticSearch() map[string]elasticSearchNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.ElasticSearch.Clone()
|
|
}
|
|
|
|
func (n *notifier) SetRedisByID(accountID string, r redisNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.Redis[accountID] = r
|
|
}
|
|
|
|
func (n *notifier) GetRedis() map[string]redisNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.Redis.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetRedisByID(accountID string) redisNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.Redis[accountID]
|
|
}
|
|
|
|
func (n *notifier) GetWebhook() map[string]webhookNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.Webhook.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetWebhookByID(accountID string) webhookNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.Webhook[accountID]
|
|
}
|
|
|
|
func (n *notifier) SetWebhookByID(accountID string, pgn webhookNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.Webhook[accountID] = pgn
|
|
}
|
|
|
|
func (n *notifier) SetPostgreSQLByID(accountID string, pgn postgreSQLNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.PostgreSQL[accountID] = pgn
|
|
}
|
|
|
|
func (n *notifier) GetPostgreSQL() map[string]postgreSQLNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.PostgreSQL.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetPostgreSQLByID(accountID string) postgreSQLNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.PostgreSQL[accountID]
|
|
}
|
|
|
|
func (n *notifier) SetMySQLByID(accountID string, pgn mySQLNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.MySQL[accountID] = pgn
|
|
}
|
|
|
|
func (n *notifier) GetMySQL() map[string]mySQLNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.MySQL.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetMySQLByID(accountID string) mySQLNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.MySQL[accountID]
|
|
}
|
|
|
|
func (n *notifier) SetKafkaByID(accountID string, kn kafkaNotify) {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
n.Kafka[accountID] = kn
|
|
}
|
|
|
|
func (n *notifier) GetKafka() map[string]kafkaNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.Kafka.Clone()
|
|
}
|
|
|
|
func (n *notifier) GetKafkaByID(accountID string) kafkaNotify {
|
|
n.RLock()
|
|
defer n.RUnlock()
|
|
return n.Kafka[accountID]
|
|
}
|
|
|