diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 3548f586e..e2e05a6dd 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1457,7 +1457,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque Mode: mode, Domain: domain, Region: globalServerRegion, - SQSARN: globalNotificationSys.GetARNList(), + SQSARN: globalNotificationSys.GetARNList(false), DeploymentID: globalDeploymentID, Buckets: buckets, Objects: objects, diff --git a/cmd/config/notify/parse.go b/cmd/config/notify/parse.go index 71e5f8fb8..ee0b96329 100644 --- a/cmd/config/notify/parse.go +++ b/cmd/config/notify/parse.go @@ -102,8 +102,10 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport var targetsOffline bool defer func() { - // Automatically close all connections to targets when an error occur - if err != nil { + // Automatically close all connections to targets when an error occur. + // Close all the targets if returnOnTargetError is set + // Else, close only the failed targets + if err != nil && returnOnTargetError { for _, t := range targetList.TargetMap() { _ = t.Close() } @@ -174,6 +176,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { @@ -194,6 +197,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -214,6 +218,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -234,6 +239,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -253,6 +259,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -272,6 +279,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -291,6 +299,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -310,6 +319,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -329,6 +339,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) @@ -348,6 +359,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport if returnOnTargetError { return nil, err } + _ = newTarget.Close() } if err = targetList.Add(newTarget); err != nil { logger.LogIf(context.Background(), err) diff --git a/cmd/notification.go b/cmd/notification.go index 1194dccf8..d1bd6764d 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -56,18 +56,23 @@ type NotificationSys struct { } // GetARNList - returns available ARNs. -func (sys *NotificationSys) GetARNList() []string { +func (sys *NotificationSys) GetARNList(onlyActive bool) []string { arns := []string{} if sys == nil { return arns } region := globalServerRegion - for _, targetID := range sys.targetList.List() { + for targetID, target := range sys.targetList.TargetMap() { // httpclient target is part of ListenBucketNotification // which doesn't need to be listed as part of the ARN list // This list is only meant for external targets, filter // this out pro-actively. if !strings.HasPrefix(targetID.ID, "httpclient+") { + if onlyActive && !target.HasQueueStore() { + if _, err := target.IsActive(); err != nil { + continue + } + } arns = append(arns, targetID.ToARN(region).String()) } } diff --git a/cmd/peer-rest-client-target.go b/cmd/peer-rest-client-target.go index 7db0827ae..05a5780b3 100644 --- a/cmd/peer-rest-client-target.go +++ b/cmd/peer-rest-client-target.go @@ -36,6 +36,11 @@ func (target *PeerRESTClientTarget) IsActive() (bool, error) { return true, nil } +// HasQueueStore - No-Op. Added for interface compatibility +func (target PeerRESTClientTarget) HasQueueStore() bool { + return false +} + // Save - Sends event directly without persisting. func (target *PeerRESTClientTarget) Save(eventData event.Event) error { return target.send(eventData) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index cb97bc588..5e0df29dc 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -59,7 +59,7 @@ func getServerInfo() (*ServerInfoData, error) { Version: Version, CommitID: CommitID, DeploymentID: globalDeploymentID, - SQSARN: globalNotificationSys.GetARNList(), + SQSARN: globalNotificationSys.GetARNList(false), Region: globalServerRegion, }, }, nil diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index 5a9d92eca..ef18b56c2 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -202,7 +202,7 @@ func printEventNotifiers() { return } - arns := globalNotificationSys.GetARNList() + arns := globalNotificationSys.GetARNList(true) if len(arns) == 0 { return } diff --git a/pkg/event/target/amqp.go b/pkg/event/target/amqp.go index 320e0211d..3a9b0a9ee 100644 --- a/pkg/event/target/amqp.go +++ b/pkg/event/target/amqp.go @@ -132,6 +132,11 @@ func (target *AMQPTarget) IsActive() (bool, error) { return true, nil } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *AMQPTarget) HasQueueStore() bool { + return target.store != nil +} + func (target *AMQPTarget) channel() (*amqp.Channel, error) { var err error var conn *amqp.Connection diff --git a/pkg/event/target/elasticsearch.go b/pkg/event/target/elasticsearch.go index 2316d4b6c..072fd52a3 100644 --- a/pkg/event/target/elasticsearch.go +++ b/pkg/event/target/elasticsearch.go @@ -91,6 +91,11 @@ func (target *ElasticsearchTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *ElasticsearchTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *ElasticsearchTarget) IsActive() (bool, error) { if dErr := target.args.URL.DialHTTP(nil); dErr != nil { diff --git a/pkg/event/target/httpclient.go b/pkg/event/target/httpclient.go index 67cf97fd9..85af07077 100644 --- a/pkg/event/target/httpclient.go +++ b/pkg/event/target/httpclient.go @@ -49,6 +49,11 @@ func (target *HTTPClientTarget) IsActive() (bool, error) { return true, nil } +// HasQueueStore - No-Op. Added for interface compatibility +func (target *HTTPClientTarget) HasQueueStore() bool { + return false +} + func (target *HTTPClientTarget) start() { go func() { defer func() { diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index be2d8a6b0..a900a26de 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -133,6 +133,11 @@ func (target *KafkaTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *KafkaTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *KafkaTarget) IsActive() (bool, error) { if !target.args.pingBrokers() { diff --git a/pkg/event/target/mqtt.go b/pkg/event/target/mqtt.go index 204b7078e..866fed156 100644 --- a/pkg/event/target/mqtt.go +++ b/pkg/event/target/mqtt.go @@ -118,6 +118,11 @@ func (target *MQTTTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *MQTTTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *MQTTTarget) IsActive() (bool, error) { if !target.client.IsConnectionOpen() { diff --git a/pkg/event/target/mysql.go b/pkg/event/target/mysql.go index 3f41b70bf..e8bd22c8a 100644 --- a/pkg/event/target/mysql.go +++ b/pkg/event/target/mysql.go @@ -183,6 +183,11 @@ func (target *MySQLTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *MySQLTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *MySQLTarget) IsActive() (bool, error) { if target.db == nil { diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index eced9464f..93e8a9339 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -207,6 +207,11 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *NATSTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *NATSTarget) IsActive() (bool, error) { var connErr error diff --git a/pkg/event/target/nsq.go b/pkg/event/target/nsq.go index e643ac2c0..39789059b 100644 --- a/pkg/event/target/nsq.go +++ b/pkg/event/target/nsq.go @@ -99,6 +99,11 @@ func (target *NSQTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *NSQTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *NSQTarget) IsActive() (bool, error) { if target.producer != nil { diff --git a/pkg/event/target/postgresql.go b/pkg/event/target/postgresql.go index a1663c851..409b04f45 100644 --- a/pkg/event/target/postgresql.go +++ b/pkg/event/target/postgresql.go @@ -182,6 +182,11 @@ func (target *PostgreSQLTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *PostgreSQLTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *PostgreSQLTarget) IsActive() (bool, error) { if target.db == nil { diff --git a/pkg/event/target/redis.go b/pkg/event/target/redis.go index d92d8a3c5..26d75869d 100644 --- a/pkg/event/target/redis.go +++ b/pkg/event/target/redis.go @@ -128,6 +128,11 @@ func (target *RedisTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *RedisTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *RedisTarget) IsActive() (bool, error) { conn := target.pool.Get() diff --git a/pkg/event/target/webhook.go b/pkg/event/target/webhook.go index 0152014f1..b1c43f1fa 100644 --- a/pkg/event/target/webhook.go +++ b/pkg/event/target/webhook.go @@ -87,6 +87,11 @@ func (target WebhookTarget) ID() event.TargetID { return target.id } +// HasQueueStore - Checks if the queueStore has been configured for the target +func (target *WebhookTarget) HasQueueStore() bool { + return target.store != nil +} + // IsActive - Return true if target is up and active func (target *WebhookTarget) IsActive() (bool, error) { u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String()) diff --git a/pkg/event/targetlist.go b/pkg/event/targetlist.go index aea9d10a8..333b3ec6d 100644 --- a/pkg/event/targetlist.go +++ b/pkg/event/targetlist.go @@ -28,6 +28,7 @@ type Target interface { Save(Event) error Send(string) error Close() error + HasQueueStore() bool } // TargetList - holds list of targets indexed by target ID. diff --git a/pkg/event/targetlist_test.go b/pkg/event/targetlist_test.go index aa3815db3..13a88cd9e 100644 --- a/pkg/event/targetlist_test.go +++ b/pkg/event/targetlist_test.go @@ -71,6 +71,11 @@ func (target ExampleTarget) IsActive() (bool, error) { return false, errors.New("not connected to target server/service") } +// HasQueueStore - No-Op. Added for interface compatibility +func (target ExampleTarget) HasQueueStore() bool { + return false +} + func TestTargetListAdd(t *testing.T) { targetListCase1 := NewTargetList()