/* * MinIO Cloud Storage, (C) 2019 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package notify import ( "context" "crypto/tls" "crypto/x509" "errors" "net/http" "strconv" "strings" "time" "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event/target" xnet "github.com/minio/minio/pkg/net" ) const ( formatNamespace = "namespace" formatAccess = "access" ) // ErrTargetsOffline - Indicates single/multiple target failures. var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets") // TestNotificationTargets is similar to GetNotificationTargets() // avoids explicit registration. func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID) error { test := true returnOnTargetError := true targets, err := RegisterNotificationTargets(cfg, doneCh, transport, targetIDs, test, returnOnTargetError) if err == nil { // Close all targets since we are only testing connections. for _, t := range targets.TargetMap() { _ = t.Close() } } return err } // GetNotificationTargets registers and initializes all notification // targets, returns error if any. func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) (*event.TargetList, error) { test := false returnOnTargetError := false return RegisterNotificationTargets(cfg, doneCh, transport, nil, test, returnOnTargetError) } // RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig. // A new notification target is added like below // * Add a new target in pkg/event/target package. // * Add newly added target configuration to serverConfig.Notify.. // * Handle the configuration in this function to create/add into TargetList. func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID, test bool, returnOnTargetError bool) (*event.TargetList, error) { targetList, err := FetchRegisteredTargets(cfg, doneCh, transport, test, returnOnTargetError) if err != nil { return targetList, err } if test { // Verify if user is trying to disable already configured // notification targets, based on their target IDs for _, targetID := range targetIDs { if !targetList.Exists(targetID) { return nil, config.Errorf( "Unable to disable configured targets '%v'", targetID) } } } return targetList, nil } // FetchRegisteredTargets - Returns a set of configured TargetList // If `returnOnTargetError` is set to true, The function returns when a target initialization fails // Else, the function will return a complete TargetList irrespective of errors func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, test bool, returnOnTargetError bool) (_ *event.TargetList, err error) { targetList := event.NewTargetList() var targetsOffline bool defer func() { // Automatically close all connections to targets when an error occur. // Close all the targets if returnOnTargetError is set // Else, close only the failed targets if err != nil && returnOnTargetError { for _, t := range targetList.TargetMap() { _ = t.Close() } } }() if err = checkValidNotificationKeys(cfg); err != nil { return nil, err } amqpTargets, err := GetNotifyAMQP(cfg[config.NotifyAMQPSubSys]) if err != nil { return nil, err } esTargets, err := GetNotifyES(cfg[config.NotifyESSubSys]) if err != nil { return nil, err } kafkaTargets, err := GetNotifyKafka(cfg[config.NotifyKafkaSubSys]) if err != nil { return nil, err } mqttTargets, err := GetNotifyMQTT(cfg[config.NotifyMQTTSubSys], transport.TLSClientConfig.RootCAs) if err != nil { return nil, err } mysqlTargets, err := GetNotifyMySQL(cfg[config.NotifyMySQLSubSys]) if err != nil { return nil, err } natsTargets, err := GetNotifyNATS(cfg[config.NotifyNATSSubSys], transport.TLSClientConfig.RootCAs) if err != nil { return nil, err } nsqTargets, err := GetNotifyNSQ(cfg[config.NotifyNSQSubSys]) if err != nil { return nil, err } postgresTargets, err := GetNotifyPostgres(cfg[config.NotifyPostgresSubSys]) if err != nil { return nil, err } redisTargets, err := GetNotifyRedis(cfg[config.NotifyRedisSubSys]) if err != nil { return nil, err } webhookTargets, err := GetNotifyWebhook(cfg[config.NotifyWebhookSubSys], transport) if err != nil { return nil, err } for id, args := range amqpTargets { if !args.Enable { continue } newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range esTargets { if !args.Enable { continue } newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range kafkaTargets { if !args.Enable { continue } args.TLS.RootCAs = transport.TLSClientConfig.RootCAs newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range mqttTargets { if !args.Enable { continue } args.RootCAs = transport.TLSClientConfig.RootCAs newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range mysqlTargets { if !args.Enable { continue } newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range natsTargets { if !args.Enable { continue } newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range nsqTargets { if !args.Enable { continue } newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range postgresTargets { if !args.Enable { continue } newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range redisTargets { if !args.Enable { continue } newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } for id, args := range webhookTargets { if !args.Enable { continue } newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport, test) if err != nil { targetsOffline = true if returnOnTargetError { return nil, err } _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) if returnOnTargetError { return nil, err } } } if targetsOffline { return targetList, ErrTargetsOffline } return targetList, nil } // DefaultNotificationKVS - default notification list of kvs. var ( DefaultNotificationKVS = map[string]config.KVS{ config.NotifyAMQPSubSys: DefaultAMQPKVS, config.NotifyKafkaSubSys: DefaultKafkaKVS, config.NotifyMQTTSubSys: DefaultMQTTKVS, config.NotifyMySQLSubSys: DefaultMySQLKVS, config.NotifyNATSSubSys: DefaultNATSKVS, config.NotifyNSQSubSys: DefaultNSQKVS, config.NotifyPostgresSubSys: DefaultPostgresKVS, config.NotifyRedisSubSys: DefaultRedisKVS, config.NotifyWebhookSubSys: DefaultWebhookKVS, config.NotifyESSubSys: DefaultESKVS, } ) func checkValidNotificationKeys(cfg config.Config) error { for subSys, tgt := range cfg { validKVS, ok := DefaultNotificationKVS[subSys] if !ok { continue } for tname, kv := range tgt { subSysTarget := subSys if tname != config.Default { subSysTarget = subSys + config.SubSystemSeparator + tname } if v, ok := kv.Lookup(config.Enable); ok && v == config.EnableOn { if err := config.CheckValidKeys(subSysTarget, kv, validKVS); err != nil { return err } } } } return nil } func mergeTargets(cfgTargets map[string]config.KVS, envname string, defaultKVS config.KVS) map[string]config.KVS { newCfgTargets := make(map[string]config.KVS) for _, e := range env.List(envname) { tgt := strings.TrimPrefix(e, envname+config.Default) if tgt == envname { tgt = config.Default } newCfgTargets[tgt] = defaultKVS } for tgt, kv := range cfgTargets { newCfgTargets[tgt] = kv } return newCfgTargets } // DefaultKakfaKVS - default KV for kafka target var ( DefaultKafkaKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.KafkaTopic, Value: "", }, config.KV{ Key: target.KafkaBrokers, Value: "", }, config.KV{ Key: target.KafkaSASLUsername, Value: "", }, config.KV{ Key: target.KafkaSASLPassword, Value: "", }, config.KV{ Key: target.KafkaSASLMechanism, Value: "plain", }, config.KV{ Key: target.KafkaClientTLSCert, Value: "", }, config.KV{ Key: target.KafkaClientTLSKey, Value: "", }, config.KV{ Key: target.KafkaTLSClientAuth, Value: "0", }, config.KV{ Key: target.KafkaSASL, Value: config.EnableOff, }, config.KV{ Key: target.KafkaTLS, Value: config.EnableOff, }, config.KV{ Key: target.KafkaTLSSkipVerify, Value: config.EnableOff, }, config.KV{ Key: target.KafkaQueueLimit, Value: "0", }, config.KV{ Key: target.KafkaQueueDir, Value: "", }, config.KV{ Key: target.KafkaVersion, Value: "", }, } ) // GetNotifyKafka - returns a map of registered notification 'kafka' targets func GetNotifyKafka(kafkaKVS map[string]config.KVS) (map[string]target.KafkaArgs, error) { kafkaTargets := make(map[string]target.KafkaArgs) for k, kv := range mergeTargets(kafkaKVS, target.EnvKafkaEnable, DefaultKafkaKVS) { enableEnv := target.EnvKafkaEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } var brokers []xnet.Host brokersEnv := target.EnvKafkaBrokers if k != config.Default { brokersEnv = brokersEnv + config.Default + k } kafkaBrokers := env.Get(brokersEnv, kv.Get(target.KafkaBrokers)) if len(kafkaBrokers) == 0 { return nil, config.Errorf("kafka 'brokers' cannot be empty") } for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) { var host *xnet.Host host, err = xnet.ParseHost(s) if err != nil { break } brokers = append(brokers, *host) } if err != nil { return nil, err } queueLimitEnv := target.EnvKafkaQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.KafkaQueueLimit)), 10, 64) if err != nil { return nil, err } clientAuthEnv := target.EnvKafkaTLSClientAuth if k != config.Default { clientAuthEnv = clientAuthEnv + config.Default + k } clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(target.KafkaTLSClientAuth))) if err != nil { return nil, err } topicEnv := target.EnvKafkaTopic if k != config.Default { topicEnv = topicEnv + config.Default + k } queueDirEnv := target.EnvKafkaQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } versionEnv := target.EnvKafkaVersion if k != config.Default { versionEnv = versionEnv + config.Default + k } kafkaArgs := target.KafkaArgs{ Enable: enabled, Brokers: brokers, Topic: env.Get(topicEnv, kv.Get(target.KafkaTopic)), QueueDir: env.Get(queueDirEnv, kv.Get(target.KafkaQueueDir)), QueueLimit: queueLimit, Version: env.Get(versionEnv, kv.Get(target.KafkaVersion)), } tlsEnableEnv := target.EnvKafkaTLS if k != config.Default { tlsEnableEnv = tlsEnableEnv + config.Default + k } tlsSkipVerifyEnv := target.EnvKafkaTLSSkipVerify if k != config.Default { tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k } tlsClientTLSCertEnv := target.EnvKafkaClientTLSCert if k != config.Default { tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k } tlsClientTLSKeyEnv := target.EnvKafkaClientTLSKey if k != config.Default { tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k } kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.KafkaTLS)) == config.EnableOn kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.KafkaTLSSkipVerify)) == config.EnableOn kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth) kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(target.KafkaClientTLSCert)) kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(target.KafkaClientTLSKey)) saslEnableEnv := target.EnvKafkaSASLEnable if k != config.Default { saslEnableEnv = saslEnableEnv + config.Default + k } saslUsernameEnv := target.EnvKafkaSASLUsername if k != config.Default { saslUsernameEnv = saslUsernameEnv + config.Default + k } saslPasswordEnv := target.EnvKafkaSASLPassword if k != config.Default { saslPasswordEnv = saslPasswordEnv + config.Default + k } saslMechanismEnv := target.EnvKafkaSASLMechanism if k != config.Default { saslMechanismEnv = saslMechanismEnv + config.Default + k } kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(target.KafkaSASL)) == config.EnableOn kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(target.KafkaSASLUsername)) kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(target.KafkaSASLPassword)) kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(target.KafkaSASLMechanism)) if err = kafkaArgs.Validate(); err != nil { return nil, err } kafkaTargets[k] = kafkaArgs } return kafkaTargets, nil } // DefaultMQTTKVS - default MQTT config var ( DefaultMQTTKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.MqttBroker, Value: "", }, config.KV{ Key: target.MqttTopic, Value: "", }, config.KV{ Key: target.MqttPassword, Value: "", }, config.KV{ Key: target.MqttUsername, Value: "", }, config.KV{ Key: target.MqttQoS, Value: "0", }, config.KV{ Key: target.MqttKeepAliveInterval, Value: "0s", }, config.KV{ Key: target.MqttReconnectInterval, Value: "0s", }, config.KV{ Key: target.MqttQueueDir, Value: "", }, config.KV{ Key: target.MqttQueueLimit, Value: "0", }, } ) // GetNotifyMQTT - returns a map of registered notification 'mqtt' targets func GetNotifyMQTT(mqttKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.MQTTArgs, error) { mqttTargets := make(map[string]target.MQTTArgs) for k, kv := range mergeTargets(mqttKVS, target.EnvMQTTEnable, DefaultMQTTKVS) { enableEnv := target.EnvMQTTEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } brokerEnv := target.EnvMQTTBroker if k != config.Default { brokerEnv = brokerEnv + config.Default + k } brokerURL, err := xnet.ParseURL(env.Get(brokerEnv, kv.Get(target.MqttBroker))) if err != nil { return nil, err } reconnectIntervalEnv := target.EnvMQTTReconnectInterval if k != config.Default { reconnectIntervalEnv = reconnectIntervalEnv + config.Default + k } reconnectInterval, err := time.ParseDuration(env.Get(reconnectIntervalEnv, kv.Get(target.MqttReconnectInterval))) if err != nil { return nil, err } keepAliveIntervalEnv := target.EnvMQTTKeepAliveInterval if k != config.Default { keepAliveIntervalEnv = keepAliveIntervalEnv + config.Default + k } keepAliveInterval, err := time.ParseDuration(env.Get(keepAliveIntervalEnv, kv.Get(target.MqttKeepAliveInterval))) if err != nil { return nil, err } queueLimitEnv := target.EnvMQTTQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MqttQueueLimit)), 10, 64) if err != nil { return nil, err } qosEnv := target.EnvMQTTQoS if k != config.Default { qosEnv = qosEnv + config.Default + k } // Parse uint8 value qos, err := strconv.ParseUint(env.Get(qosEnv, kv.Get(target.MqttQoS)), 10, 8) if err != nil { return nil, err } topicEnv := target.EnvMQTTTopic if k != config.Default { topicEnv = topicEnv + config.Default + k } usernameEnv := target.EnvMQTTUsername if k != config.Default { usernameEnv = usernameEnv + config.Default + k } passwordEnv := target.EnvMQTTPassword if k != config.Default { passwordEnv = passwordEnv + config.Default + k } queueDirEnv := target.EnvMQTTQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } mqttArgs := target.MQTTArgs{ Enable: enabled, Broker: *brokerURL, Topic: env.Get(topicEnv, kv.Get(target.MqttTopic)), QoS: byte(qos), User: env.Get(usernameEnv, kv.Get(target.MqttUsername)), Password: env.Get(passwordEnv, kv.Get(target.MqttPassword)), MaxReconnectInterval: reconnectInterval, KeepAlive: keepAliveInterval, RootCAs: rootCAs, QueueDir: env.Get(queueDirEnv, kv.Get(target.MqttQueueDir)), QueueLimit: queueLimit, } if err = mqttArgs.Validate(); err != nil { return nil, err } mqttTargets[k] = mqttArgs } return mqttTargets, nil } // DefaultMySQLKVS - default KV for MySQL var ( DefaultMySQLKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.MySQLFormat, Value: formatNamespace, }, config.KV{ Key: target.MySQLDSNString, Value: "", }, config.KV{ Key: target.MySQLTable, Value: "", }, config.KV{ Key: target.MySQLQueueDir, Value: "", }, config.KV{ Key: target.MySQLQueueLimit, Value: "0", }, } ) // GetNotifyMySQL - returns a map of registered notification 'mysql' targets func GetNotifyMySQL(mysqlKVS map[string]config.KVS) (map[string]target.MySQLArgs, error) { mysqlTargets := make(map[string]target.MySQLArgs) for k, kv := range mergeTargets(mysqlKVS, target.EnvMySQLEnable, DefaultMySQLKVS) { enableEnv := target.EnvMySQLEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } queueLimitEnv := target.EnvMySQLQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.MySQLQueueLimit)), 10, 64) if err != nil { return nil, err } formatEnv := target.EnvMySQLFormat if k != config.Default { formatEnv = formatEnv + config.Default + k } dsnStringEnv := target.EnvMySQLDSNString if k != config.Default { dsnStringEnv = dsnStringEnv + config.Default + k } tableEnv := target.EnvMySQLTable if k != config.Default { tableEnv = tableEnv + config.Default + k } queueDirEnv := target.EnvMySQLQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } mysqlArgs := target.MySQLArgs{ Enable: enabled, Format: env.Get(formatEnv, kv.Get(target.MySQLFormat)), DSN: env.Get(dsnStringEnv, kv.Get(target.MySQLDSNString)), Table: env.Get(tableEnv, kv.Get(target.MySQLTable)), QueueDir: env.Get(queueDirEnv, kv.Get(target.MySQLQueueDir)), QueueLimit: queueLimit, } if err = mysqlArgs.Validate(); err != nil { return nil, err } mysqlTargets[k] = mysqlArgs } return mysqlTargets, nil } // DefaultNATSKVS - NATS KV for nats config. var ( DefaultNATSKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.NATSAddress, Value: "", }, config.KV{ Key: target.NATSSubject, Value: "", }, config.KV{ Key: target.NATSUsername, Value: "", }, config.KV{ Key: target.NATSPassword, Value: "", }, config.KV{ Key: target.NATSToken, Value: "", }, config.KV{ Key: target.NATSTLS, Value: config.EnableOff, }, config.KV{ Key: target.NATSTLSSkipVerify, Value: config.EnableOff, }, config.KV{ Key: target.NATSCertAuthority, Value: "", }, config.KV{ Key: target.NATSClientCert, Value: "", }, config.KV{ Key: target.NATSClientKey, Value: "", }, config.KV{ Key: target.NATSPingInterval, Value: "0", }, config.KV{ Key: target.NATSStreaming, Value: config.EnableOff, }, config.KV{ Key: target.NATSStreamingAsync, Value: config.EnableOff, }, config.KV{ Key: target.NATSStreamingMaxPubAcksInFlight, Value: "0", }, config.KV{ Key: target.NATSStreamingClusterID, Value: "", }, config.KV{ Key: target.NATSQueueDir, Value: "", }, config.KV{ Key: target.NATSQueueLimit, Value: "0", }, } ) // GetNotifyNATS - returns a map of registered notification 'nats' targets func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[string]target.NATSArgs, error) { natsTargets := make(map[string]target.NATSArgs) for k, kv := range mergeTargets(natsKVS, target.EnvNATSEnable, DefaultNATSKVS) { enableEnv := target.EnvNATSEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } addressEnv := target.EnvNATSAddress if k != config.Default { addressEnv = addressEnv + config.Default + k } address, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NATSAddress))) if err != nil { return nil, err } pingIntervalEnv := target.EnvNATSPingInterval if k != config.Default { pingIntervalEnv = pingIntervalEnv + config.Default + k } pingInterval, err := strconv.ParseInt(env.Get(pingIntervalEnv, kv.Get(target.NATSPingInterval)), 10, 64) if err != nil { return nil, err } queueLimitEnv := target.EnvNATSQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NATSQueueLimit)), 10, 64) if err != nil { return nil, err } tlsEnv := target.EnvNATSTLS if k != config.Default { tlsEnv = tlsEnv + config.Default + k } tlsSkipVerifyEnv := target.EnvNATSTLSSkipVerify if k != config.Default { tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k } subjectEnv := target.EnvNATSSubject if k != config.Default { subjectEnv = subjectEnv + config.Default + k } usernameEnv := target.EnvNATSUsername if k != config.Default { usernameEnv = usernameEnv + config.Default + k } passwordEnv := target.EnvNATSPassword if k != config.Default { passwordEnv = passwordEnv + config.Default + k } tokenEnv := target.EnvNATSToken if k != config.Default { tokenEnv = tokenEnv + config.Default + k } queueDirEnv := target.EnvNATSQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } certAuthorityEnv := target.EnvNATSCertAuthority if k != config.Default { certAuthorityEnv = certAuthorityEnv + config.Default + k } clientCertEnv := target.EnvNATSClientCert if k != config.Default { clientCertEnv = clientCertEnv + config.Default + k } clientKeyEnv := target.EnvNATSClientKey if k != config.Default { clientKeyEnv = clientKeyEnv + config.Default + k } natsArgs := target.NATSArgs{ Enable: true, Address: *address, Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)), Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)), Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)), CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)), ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)), ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)), Token: env.Get(tokenEnv, kv.Get(target.NATSToken)), TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn, TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn, PingInterval: pingInterval, QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)), QueueLimit: queueLimit, RootCAs: rootCAs, } streamingEnableEnv := target.EnvNATSStreaming if k != config.Default { streamingEnableEnv = streamingEnableEnv + config.Default + k } streamingEnabled := env.Get(streamingEnableEnv, kv.Get(target.NATSStreaming)) == config.EnableOn if streamingEnabled { asyncEnv := target.EnvNATSStreamingAsync if k != config.Default { asyncEnv = asyncEnv + config.Default + k } maxPubAcksInflightEnv := target.EnvNATSStreamingMaxPubAcksInFlight if k != config.Default { maxPubAcksInflightEnv = maxPubAcksInflightEnv + config.Default + k } maxPubAcksInflight, err := strconv.Atoi(env.Get(maxPubAcksInflightEnv, kv.Get(target.NATSStreamingMaxPubAcksInFlight))) if err != nil { return nil, err } clusterIDEnv := target.EnvNATSStreamingClusterID if k != config.Default { clusterIDEnv = clusterIDEnv + config.Default + k } natsArgs.Streaming.Enable = streamingEnabled natsArgs.Streaming.ClusterID = env.Get(clusterIDEnv, kv.Get(target.NATSStreamingClusterID)) natsArgs.Streaming.Async = env.Get(asyncEnv, kv.Get(target.NATSStreamingAsync)) == config.EnableOn natsArgs.Streaming.MaxPubAcksInflight = maxPubAcksInflight } if err = natsArgs.Validate(); err != nil { return nil, err } natsTargets[k] = natsArgs } return natsTargets, nil } // DefaultNSQKVS - NSQ KV for config var ( DefaultNSQKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.NSQAddress, Value: "", }, config.KV{ Key: target.NSQTopic, Value: "", }, config.KV{ Key: target.NSQTLS, Value: config.EnableOff, }, config.KV{ Key: target.NSQTLSSkipVerify, Value: config.EnableOff, }, config.KV{ Key: target.NSQQueueDir, Value: "", }, config.KV{ Key: target.NSQQueueLimit, Value: "0", }, } ) // GetNotifyNSQ - returns a map of registered notification 'nsq' targets func GetNotifyNSQ(nsqKVS map[string]config.KVS) (map[string]target.NSQArgs, error) { nsqTargets := make(map[string]target.NSQArgs) for k, kv := range mergeTargets(nsqKVS, target.EnvNSQEnable, DefaultNSQKVS) { enableEnv := target.EnvNSQEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } addressEnv := target.EnvNSQAddress if k != config.Default { addressEnv = addressEnv + config.Default + k } nsqdAddress, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.NSQAddress))) if err != nil { return nil, err } tlsEnableEnv := target.EnvNSQTLS if k != config.Default { tlsEnableEnv = tlsEnableEnv + config.Default + k } tlsSkipVerifyEnv := target.EnvNSQTLSSkipVerify if k != config.Default { tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k } queueLimitEnv := target.EnvNSQQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.NSQQueueLimit)), 10, 64) if err != nil { return nil, err } topicEnv := target.EnvNSQTopic if k != config.Default { topicEnv = topicEnv + config.Default + k } queueDirEnv := target.EnvNSQQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } nsqArgs := target.NSQArgs{ Enable: enabled, NSQDAddress: *nsqdAddress, Topic: env.Get(topicEnv, kv.Get(target.NSQTopic)), QueueDir: env.Get(queueDirEnv, kv.Get(target.NSQQueueDir)), QueueLimit: queueLimit, } nsqArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(target.NSQTLS)) == config.EnableOn nsqArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(target.NSQTLSSkipVerify)) == config.EnableOn if err = nsqArgs.Validate(); err != nil { return nil, err } nsqTargets[k] = nsqArgs } return nsqTargets, nil } // DefaultPostgresKVS - default Postgres KV for server config. var ( DefaultPostgresKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.PostgresFormat, Value: formatNamespace, }, config.KV{ Key: target.PostgresConnectionString, Value: "", }, config.KV{ Key: target.PostgresTable, Value: "", }, config.KV{ Key: target.PostgresQueueDir, Value: "", }, config.KV{ Key: target.PostgresQueueLimit, Value: "0", }, } ) // GetNotifyPostgres - returns a map of registered notification 'postgres' targets func GetNotifyPostgres(postgresKVS map[string]config.KVS) (map[string]target.PostgreSQLArgs, error) { psqlTargets := make(map[string]target.PostgreSQLArgs) for k, kv := range mergeTargets(postgresKVS, target.EnvPostgresEnable, DefaultPostgresKVS) { enableEnv := target.EnvPostgresEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } queueLimitEnv := target.EnvPostgresQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.PostgresQueueLimit))) if err != nil { return nil, err } formatEnv := target.EnvPostgresFormat if k != config.Default { formatEnv = formatEnv + config.Default + k } connectionStringEnv := target.EnvPostgresConnectionString if k != config.Default { connectionStringEnv = connectionStringEnv + config.Default + k } tableEnv := target.EnvPostgresTable if k != config.Default { tableEnv = tableEnv + config.Default + k } queueDirEnv := target.EnvPostgresQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } psqlArgs := target.PostgreSQLArgs{ Enable: enabled, Format: env.Get(formatEnv, kv.Get(target.PostgresFormat)), ConnectionString: env.Get(connectionStringEnv, kv.Get(target.PostgresConnectionString)), Table: env.Get(tableEnv, kv.Get(target.PostgresTable)), QueueDir: env.Get(queueDirEnv, kv.Get(target.PostgresQueueDir)), QueueLimit: uint64(queueLimit), } if err = psqlArgs.Validate(); err != nil { return nil, err } psqlTargets[k] = psqlArgs } return psqlTargets, nil } // DefaultRedisKVS - default KV for redis config var ( DefaultRedisKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.RedisFormat, Value: formatNamespace, }, config.KV{ Key: target.RedisAddress, Value: "", }, config.KV{ Key: target.RedisKey, Value: "", }, config.KV{ Key: target.RedisPassword, Value: "", }, config.KV{ Key: target.RedisQueueDir, Value: "", }, config.KV{ Key: target.RedisQueueLimit, Value: "0", }, } ) // GetNotifyRedis - returns a map of registered notification 'redis' targets func GetNotifyRedis(redisKVS map[string]config.KVS) (map[string]target.RedisArgs, error) { redisTargets := make(map[string]target.RedisArgs) for k, kv := range mergeTargets(redisKVS, target.EnvRedisEnable, DefaultRedisKVS) { enableEnv := target.EnvRedisEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } addressEnv := target.EnvRedisAddress if k != config.Default { addressEnv = addressEnv + config.Default + k } addr, err := xnet.ParseHost(env.Get(addressEnv, kv.Get(target.RedisAddress))) if err != nil { return nil, err } queueLimitEnv := target.EnvRedisQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.RedisQueueLimit))) if err != nil { return nil, err } formatEnv := target.EnvRedisFormat if k != config.Default { formatEnv = formatEnv + config.Default + k } passwordEnv := target.EnvRedisPassword if k != config.Default { passwordEnv = passwordEnv + config.Default + k } keyEnv := target.EnvRedisKey if k != config.Default { keyEnv = keyEnv + config.Default + k } queueDirEnv := target.EnvRedisQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } redisArgs := target.RedisArgs{ Enable: enabled, Format: env.Get(formatEnv, kv.Get(target.RedisFormat)), Addr: *addr, Password: env.Get(passwordEnv, kv.Get(target.RedisPassword)), Key: env.Get(keyEnv, kv.Get(target.RedisKey)), QueueDir: env.Get(queueDirEnv, kv.Get(target.RedisQueueDir)), QueueLimit: uint64(queueLimit), } if err = redisArgs.Validate(); err != nil { return nil, err } redisTargets[k] = redisArgs } return redisTargets, nil } // DefaultWebhookKVS - default KV for webhook config var ( DefaultWebhookKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.WebhookEndpoint, Value: "", }, config.KV{ Key: target.WebhookAuthToken, Value: "", }, config.KV{ Key: target.WebhookQueueLimit, Value: "0", }, config.KV{ Key: target.WebhookQueueDir, Value: "", }, } ) // GetNotifyWebhook - returns a map of registered notification 'webhook' targets func GetNotifyWebhook(webhookKVS map[string]config.KVS, transport *http.Transport) ( map[string]target.WebhookArgs, error) { webhookTargets := make(map[string]target.WebhookArgs) for k, kv := range mergeTargets(webhookKVS, target.EnvWebhookEnable, DefaultWebhookKVS) { enableEnv := target.EnvWebhookEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } urlEnv := target.EnvWebhookEndpoint if k != config.Default { urlEnv = urlEnv + config.Default + k } url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.WebhookEndpoint))) if err != nil { return nil, err } queueLimitEnv := target.EnvWebhookQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.WebhookQueueLimit))) if err != nil { return nil, err } queueDirEnv := target.EnvWebhookQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } authEnv := target.EnvWebhookAuthToken if k != config.Default { authEnv = authEnv + config.Default + k } webhookArgs := target.WebhookArgs{ Enable: enabled, Endpoint: *url, Transport: transport, AuthToken: env.Get(authEnv, kv.Get(target.WebhookAuthToken)), QueueDir: env.Get(queueDirEnv, kv.Get(target.WebhookQueueDir)), QueueLimit: uint64(queueLimit), } if err = webhookArgs.Validate(); err != nil { return nil, err } webhookTargets[k] = webhookArgs } return webhookTargets, nil } // DefaultESKVS - default KV config for Elasticsearch target var ( DefaultESKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.ElasticURL, Value: "", }, config.KV{ Key: target.ElasticFormat, Value: formatNamespace, }, config.KV{ Key: target.ElasticIndex, Value: "", }, config.KV{ Key: target.ElasticQueueDir, Value: "", }, config.KV{ Key: target.ElasticQueueLimit, Value: "0", }, } ) // GetNotifyES - returns a map of registered notification 'elasticsearch' targets func GetNotifyES(esKVS map[string]config.KVS) (map[string]target.ElasticsearchArgs, error) { esTargets := make(map[string]target.ElasticsearchArgs) for k, kv := range mergeTargets(esKVS, target.EnvElasticEnable, DefaultESKVS) { enableEnv := target.EnvElasticEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } urlEnv := target.EnvElasticURL if k != config.Default { urlEnv = urlEnv + config.Default + k } url, err := xnet.ParseHTTPURL(env.Get(urlEnv, kv.Get(target.ElasticURL))) if err != nil { return nil, err } queueLimitEnv := target.EnvElasticQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.Atoi(env.Get(queueLimitEnv, kv.Get(target.ElasticQueueLimit))) if err != nil { return nil, err } formatEnv := target.EnvElasticFormat if k != config.Default { formatEnv = formatEnv + config.Default + k } indexEnv := target.EnvElasticIndex if k != config.Default { indexEnv = indexEnv + config.Default + k } queueDirEnv := target.EnvElasticQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } esArgs := target.ElasticsearchArgs{ Enable: enabled, Format: env.Get(formatEnv, kv.Get(target.ElasticFormat)), URL: *url, Index: env.Get(indexEnv, kv.Get(target.ElasticIndex)), QueueDir: env.Get(queueDirEnv, kv.Get(target.ElasticQueueDir)), QueueLimit: uint64(queueLimit), } if err = esArgs.Validate(); err != nil { return nil, err } esTargets[k] = esArgs } return esTargets, nil } // DefaultAMQPKVS - default KV for AMQP config var ( DefaultAMQPKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: target.AmqpURL, Value: "", }, config.KV{ Key: target.AmqpExchange, Value: "", }, config.KV{ Key: target.AmqpExchangeType, Value: "", }, config.KV{ Key: target.AmqpRoutingKey, Value: "", }, config.KV{ Key: target.AmqpMandatory, Value: config.EnableOff, }, config.KV{ Key: target.AmqpDurable, Value: config.EnableOff, }, config.KV{ Key: target.AmqpNoWait, Value: config.EnableOff, }, config.KV{ Key: target.AmqpInternal, Value: config.EnableOff, }, config.KV{ Key: target.AmqpAutoDeleted, Value: config.EnableOff, }, config.KV{ Key: target.AmqpDeliveryMode, Value: "0", }, config.KV{ Key: target.AmqpQueueLimit, Value: "0", }, config.KV{ Key: target.AmqpQueueDir, Value: "", }, } ) // GetNotifyAMQP - returns a map of registered notification 'amqp' targets func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, error) { amqpTargets := make(map[string]target.AMQPArgs) for k, kv := range mergeTargets(amqpKVS, target.EnvAMQPEnable, DefaultAMQPKVS) { enableEnv := target.EnvAMQPEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return nil, err } if !enabled { continue } urlEnv := target.EnvAMQPURL if k != config.Default { urlEnv = urlEnv + config.Default + k } url, err := xnet.ParseURL(env.Get(urlEnv, kv.Get(target.AmqpURL))) if err != nil { return nil, err } deliveryModeEnv := target.EnvAMQPDeliveryMode if k != config.Default { deliveryModeEnv = deliveryModeEnv + config.Default + k } deliveryMode, err := strconv.Atoi(env.Get(deliveryModeEnv, kv.Get(target.AmqpDeliveryMode))) if err != nil { return nil, err } exchangeEnv := target.EnvAMQPExchange if k != config.Default { exchangeEnv = exchangeEnv + config.Default + k } routingKeyEnv := target.EnvAMQPRoutingKey if k != config.Default { routingKeyEnv = routingKeyEnv + config.Default + k } exchangeTypeEnv := target.EnvAMQPExchangeType if k != config.Default { exchangeTypeEnv = exchangeTypeEnv + config.Default + k } mandatoryEnv := target.EnvAMQPMandatory if k != config.Default { mandatoryEnv = mandatoryEnv + config.Default + k } immediateEnv := target.EnvAMQPImmediate if k != config.Default { immediateEnv = immediateEnv + config.Default + k } durableEnv := target.EnvAMQPDurable if k != config.Default { durableEnv = durableEnv + config.Default + k } internalEnv := target.EnvAMQPInternal if k != config.Default { internalEnv = internalEnv + config.Default + k } noWaitEnv := target.EnvAMQPNoWait if k != config.Default { noWaitEnv = noWaitEnv + config.Default + k } autoDeletedEnv := target.EnvAMQPAutoDeleted if k != config.Default { autoDeletedEnv = autoDeletedEnv + config.Default + k } queueDirEnv := target.EnvAMQPQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } queueLimitEnv := target.EnvAMQPQueueLimit if k != config.Default { queueLimitEnv = queueLimitEnv + config.Default + k } queueLimit, err := strconv.ParseUint(env.Get(queueLimitEnv, kv.Get(target.AmqpQueueLimit)), 10, 64) if err != nil { return nil, err } amqpArgs := target.AMQPArgs{ Enable: enabled, URL: *url, Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)), RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)), ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)), DeliveryMode: uint8(deliveryMode), Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn, Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn, Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn, Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn, NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn, AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn, QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)), QueueLimit: queueLimit, } if err = amqpArgs.Validate(); err != nil { return nil, err } amqpTargets[k] = amqpArgs } return amqpTargets, nil }