diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index eb33f6490..f998c7cc9 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1457,7 +1457,9 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus { lambdaMap := make(map[string][]madmin.TargetIDStatus) - targetList, _ := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport()) + + // Fetch the targets + targetList, _ := notify.RegisterNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport(), nil, true) for targetID, target := range targetList.TargetMap() { targetIDStatus := make(map[string]madmin.Status) @@ -1470,6 +1472,8 @@ func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus { list := lambdaMap[targetID.Name] list = append(list, targetIDStatus) lambdaMap[targetID.Name] = list + // Close any leaking connections + _ = target.Close() } notify := make([]map[string][]madmin.TargetIDStatus, len(lambdaMap)) diff --git a/cmd/config/notify/parse.go b/cmd/config/notify/parse.go index a1e56b46a..328d9f015 100644 --- a/cmd/config/notify/parse.go +++ b/cmd/config/notify/parse.go @@ -118,7 +118,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -134,7 +134,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err @@ -152,7 +152,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran continue } args.TLS.RootCAs = transport.TLSClientConfig.RootCAs - newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -169,7 +169,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran continue } args.RootCAs = transport.TLSClientConfig.RootCAs - newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -185,7 +185,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -201,7 +201,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -217,7 +217,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -233,7 +233,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -249,7 +249,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf) + newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf, test) if err != nil { return nil, err } @@ -265,7 +265,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran if !args.Enable { continue } - newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport) + newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport, test) if err != nil { return nil, err } diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index d48e7c20b..a75dac2b6 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -260,11 +260,14 @@ func (target *AMQPTarget) Send(eventKey string) error { // Close - does nothing and available for interface compatibility. func (target *AMQPTarget) Close() error { + if target.conn != nil { + return target.conn.Close() + } return nil } // NewAMQPTarget - creates new AMQP target. -func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) (*AMQPTarget, error) { +func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error) { var conn *amqp.Connection var err error @@ -293,7 +296,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce loggerOnce: loggerOnce, } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 0fb0139d3..7e2022b01 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -204,6 +204,10 @@ func (target *ElasticsearchTarget) Send(eventKey string) error { // Close - does nothing and available for interface compatibility. func (target *ElasticsearchTarget) Close() error { + if target.client != nil { + // Stops the background processes that the client is running. + target.client.Stop() + } return nil } @@ -242,7 +246,7 @@ func newClient(args ElasticsearchArgs) (*elastic.Client, error) { } // NewElasticsearchTarget - creates new Elasticsearch target. -func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*ElasticsearchTarget, error) { +func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) { var client *elastic.Client var err error @@ -275,7 +279,7 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str store: store, } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 3d867b082..24ef7a85f 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -234,7 +234,7 @@ func (k KafkaArgs) pingBrokers() bool { } // NewKafkaTarget - creates new Kafka target with auth credentials. -func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*KafkaTarget, error) { +func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) { config := sarama.NewConfig() config.Net.SASL.User = args.SASL.User @@ -287,7 +287,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc store: store, } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 671388ea6..d02ff3a9e 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -199,7 +199,7 @@ func (target *MQTTTarget) Close() error { } // NewMQTTTarget - creates new MQTT target. -func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MQTTTarget, error) { +func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error) { if args.MaxReconnectInterval == 0 { // Default interval // https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115 @@ -261,13 +261,13 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce return nil, err } - go retryRegister() - - // Replays the events from the store. - eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) - - // Start replaying events from the store. - go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + if !test { + go retryRegister() + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh, loggerOnce) + } } else { if token.Wait() && token.Error() != nil { return nil, token.Error() diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index 1c57354da..61e19899f 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -345,7 +345,7 @@ func (target *MySQLTarget) executeStmts() error { } // NewMySQLTarget - creates new MySQL target. -func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MySQLTarget, error) { +func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) { var firstPing bool if args.DSN == "" { config := mysql.Config{ @@ -395,7 +395,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc target.firstPing = true } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index 723960dd5..c42d6ae24 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -312,7 +312,7 @@ func (target *NATSTarget) Close() (err error) { } // NewNATSTarget - creates new NATS target. -func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*NATSTarget, error) { +func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) { var natsConn *nats.Conn var stanConn stan.Conn @@ -348,7 +348,7 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce store: store, } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index fa729e5e8..e143d7238 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -167,13 +167,15 @@ func (target *NSQTarget) Send(eventKey string) error { // Close - closes underneath connections to NSQD server. func (target *NSQTarget) Close() (err error) { - // this blocks until complete: - target.producer.Stop() + if target.producer != nil { + // this blocks until complete: + target.producer.Stop() + } return nil } // NewNSQTarget - creates new NSQ target. -func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*NSQTarget, error) { +func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error) { config := nsq.NewConfig() if args.TLS.Enable { config.TlsV1 = true @@ -211,7 +213,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu } } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index b19aad74b..f1dade7a7 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -344,7 +344,7 @@ func (target *PostgreSQLTarget) executeStmts() error { } // NewPostgreSQLTarget - creates new PostgreSQL target. -func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*PostgreSQLTarget, error) { +func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) { var firstPing bool params := []string{args.ConnectionString} @@ -400,7 +400,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, target.firstPing = true } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index 3a1b321ff..78c22f906 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -243,13 +243,13 @@ func (target *RedisTarget) Send(eventKey string) error { return target.store.Del(eventKey) } -// Close - does nothing and available for interface compatibility. +// Close - releases the resources used by the pool. func (target *RedisTarget) Close() error { - return nil + return target.pool.Close() } // NewRedisTarget - creates new Redis target. -func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) (*RedisTarget, error) { +func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error) { pool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 2 * 60 * time.Second, @@ -314,7 +314,7 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc target.firstPing = true } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store. diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 27bb72fd5..02dbc2e84 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -186,11 +186,13 @@ func (target *WebhookTarget) Send(eventKey string) error { // Close - does nothing and available for interface compatibility. func (target *WebhookTarget) Close() error { + // Close idle connection with "keep-alive" states + target.httpClient.CloseIdleConnections() return nil } // NewWebhookTarget - creates new Webhook target. -func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport) (*WebhookTarget, error) { +func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error) { var store Store @@ -211,7 +213,7 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge store: store, } - if target.store != nil { + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID()) // Start replaying events from the store.