fix: only show active/available ARNs in server startup banner (#9392)

master
Praveen raj Mani 5 years ago committed by GitHub
parent 1b38aed05f
commit 322385f1b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cmd/admin-handlers.go
  2. 16
      cmd/config/notify/parse.go
  3. 9
      cmd/notification.go
  4. 5
      cmd/peer-rest-client-target.go
  5. 2
      cmd/peer-rest-server.go
  6. 2
      cmd/server-startup-msg.go
  7. 5
      pkg/event/target/amqp.go
  8. 5
      pkg/event/target/elasticsearch.go
  9. 5
      pkg/event/target/httpclient.go
  10. 5
      pkg/event/target/kafka.go
  11. 5
      pkg/event/target/mqtt.go
  12. 5
      pkg/event/target/mysql.go
  13. 5
      pkg/event/target/nats.go
  14. 5
      pkg/event/target/nsq.go
  15. 5
      pkg/event/target/postgresql.go
  16. 5
      pkg/event/target/redis.go
  17. 5
      pkg/event/target/webhook.go
  18. 1
      pkg/event/targetlist.go
  19. 5
      pkg/event/targetlist_test.go

@ -1457,7 +1457,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
Mode: mode,
Domain: domain,
Region: globalServerRegion,
SQSARN: globalNotificationSys.GetARNList(),
SQSARN: globalNotificationSys.GetARNList(false),
DeploymentID: globalDeploymentID,
Buckets: buckets,
Objects: objects,

@ -102,8 +102,10 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
var targetsOffline bool
defer func() {
// Automatically close all connections to targets when an error occur
if err != nil {
// Automatically close all connections to targets when an error occur.
// Close all the targets if returnOnTargetError is set
// Else, close only the failed targets
if err != nil && returnOnTargetError {
for _, t := range targetList.TargetMap() {
_ = t.Close()
}
@ -174,6 +176,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
@ -194,6 +197,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -214,6 +218,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -234,6 +239,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -253,6 +259,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -272,6 +279,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -291,6 +299,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -310,6 +319,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -329,6 +339,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
@ -348,6 +359,7 @@ func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport
if returnOnTargetError {
return nil, err
}
_ = newTarget.Close()
}
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)

@ -56,18 +56,23 @@ type NotificationSys struct {
}
// GetARNList - returns available ARNs.
func (sys *NotificationSys) GetARNList() []string {
func (sys *NotificationSys) GetARNList(onlyActive bool) []string {
arns := []string{}
if sys == nil {
return arns
}
region := globalServerRegion
for _, targetID := range sys.targetList.List() {
for targetID, target := range sys.targetList.TargetMap() {
// httpclient target is part of ListenBucketNotification
// which doesn't need to be listed as part of the ARN list
// This list is only meant for external targets, filter
// this out pro-actively.
if !strings.HasPrefix(targetID.ID, "httpclient+") {
if onlyActive && !target.HasQueueStore() {
if _, err := target.IsActive(); err != nil {
continue
}
}
arns = append(arns, targetID.ToARN(region).String())
}
}

@ -36,6 +36,11 @@ func (target *PeerRESTClientTarget) IsActive() (bool, error) {
return true, nil
}
// HasQueueStore - No-Op. Added for interface compatibility
func (target PeerRESTClientTarget) HasQueueStore() bool {
return false
}
// Save - Sends event directly without persisting.
func (target *PeerRESTClientTarget) Save(eventData event.Event) error {
return target.send(eventData)

@ -59,7 +59,7 @@ func getServerInfo() (*ServerInfoData, error) {
Version: Version,
CommitID: CommitID,
DeploymentID: globalDeploymentID,
SQSARN: globalNotificationSys.GetARNList(),
SQSARN: globalNotificationSys.GetARNList(false),
Region: globalServerRegion,
},
}, nil

@ -202,7 +202,7 @@ func printEventNotifiers() {
return
}
arns := globalNotificationSys.GetARNList()
arns := globalNotificationSys.GetARNList(true)
if len(arns) == 0 {
return
}

@ -132,6 +132,11 @@ func (target *AMQPTarget) IsActive() (bool, error) {
return true, nil
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *AMQPTarget) HasQueueStore() bool {
return target.store != nil
}
func (target *AMQPTarget) channel() (*amqp.Channel, error) {
var err error
var conn *amqp.Connection

@ -91,6 +91,11 @@ func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *ElasticsearchTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *ElasticsearchTarget) IsActive() (bool, error) {
if dErr := target.args.URL.DialHTTP(nil); dErr != nil {

@ -49,6 +49,11 @@ func (target *HTTPClientTarget) IsActive() (bool, error) {
return true, nil
}
// HasQueueStore - No-Op. Added for interface compatibility
func (target *HTTPClientTarget) HasQueueStore() bool {
return false
}
func (target *HTTPClientTarget) start() {
go func() {
defer func() {

@ -133,6 +133,11 @@ func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *KafkaTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *KafkaTarget) IsActive() (bool, error) {
if !target.args.pingBrokers() {

@ -118,6 +118,11 @@ func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *MQTTTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *MQTTTarget) IsActive() (bool, error) {
if !target.client.IsConnectionOpen() {

@ -183,6 +183,11 @@ func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *MySQLTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *MySQLTarget) IsActive() (bool, error) {
if target.db == nil {

@ -207,6 +207,11 @@ func (target *NATSTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *NATSTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *NATSTarget) IsActive() (bool, error) {
var connErr error

@ -99,6 +99,11 @@ func (target *NSQTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *NSQTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *NSQTarget) IsActive() (bool, error) {
if target.producer != nil {

@ -182,6 +182,11 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *PostgreSQLTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *PostgreSQLTarget) IsActive() (bool, error) {
if target.db == nil {

@ -128,6 +128,11 @@ func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *RedisTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *RedisTarget) IsActive() (bool, error) {
conn := target.pool.Get()

@ -87,6 +87,11 @@ func (target WebhookTarget) ID() event.TargetID {
return target.id
}
// HasQueueStore - Checks if the queueStore has been configured for the target
func (target *WebhookTarget) HasQueueStore() bool {
return target.store != nil
}
// IsActive - Return true if target is up and active
func (target *WebhookTarget) IsActive() (bool, error) {
u, pErr := xnet.ParseHTTPURL(target.args.Endpoint.String())

@ -28,6 +28,7 @@ type Target interface {
Save(Event) error
Send(string) error
Close() error
HasQueueStore() bool
}
// TargetList - holds list of targets indexed by target ID.

@ -71,6 +71,11 @@ func (target ExampleTarget) IsActive() (bool, error) {
return false, errors.New("not connected to target server/service")
}
// HasQueueStore - No-Op. Added for interface compatibility
func (target ExampleTarget) HasQueueStore() bool {
return false
}
func TestTargetListAdd(t *testing.T) {
targetListCase1 := NewTargetList()

Loading…
Cancel
Save