event: use common initialization logic (#3798)

Previously creating and adding targets for each notification type was
repeated.  This patch fixes it.
master
Bala FA 8 years ago committed by Harshavardhana
parent 70d2cb5f4d
commit 69777b654e
  1. 144
      cmd/event-notifier.go

@ -537,6 +537,28 @@ func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationCon
return nConfigs, lConfigs, nil
}
// addQueueTarget - calls newTargetFunc function and adds its returned value to queueTargets
func addQueueTarget(queueTargets map[string]*logrus.Logger,
accountID, queueType string,
newTargetFunc func(string) (*logrus.Logger, error)) (string, error) {
// Construct the queue ARN for AMQP.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueType
// Queue target if already initialized we move to the next ARN.
if _, ok := queueTargets[queueARN]; ok {
return queueARN, nil
}
// Using accountID we can now initialize a new AMQP logrus instance.
logger, err := newTargetFunc(accountID)
if err == nil {
queueTargets[queueARN] = logger
}
return queueARN, err
}
// Loads all queue targets, initializes each queueARNs depending on their config.
// Each instance of queueARN registers its own logrus to communicate with the
// queue service. QueueARN once initialized is not initialized again for the
@ -548,54 +570,37 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
if !amqpN.Enable {
continue
}
// Construct the queue ARN for AMQP.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeAMQP
// Queue target if already initialized we move to the next ARN.
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new AMQP logrus instance.
amqpLog, err := newAMQPNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeAMQP, newAMQPNotify); err != nil {
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = amqpLog
}
// Load all nats targets, initialize their respective loggers.
for accountID, natsN := range serverConfig.Notify.GetNATS() {
if !natsN.Enable {
continue
}
// Construct the queue ARN for NATS.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeNATS
// Queue target if already initialized we move to the next ARN.
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new NATS logrus instance.
natsLog, err := newNATSNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeNATS, newNATSNotify); err != nil {
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = natsLog
}
// Load redis targets, initialize their respective loggers.
@ -603,27 +608,18 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
if !redisN.Enable {
continue
}
// Construct the queue ARN for Redis.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeRedis
// Queue target if already initialized we move to the next ARN.
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new Redis logrus instance.
redisLog, err := newRedisNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeRedis, newRedisNotify); err != nil {
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = redisLog
}
// Load Webhook targets, initialize their respective loggers.
@ -631,20 +627,10 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
if !webhookN.Enable {
continue
}
// Construct the queue ARN for Webhook.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeWebhook
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new Webhook logrus instance.
webhookLog, err := newWebhookNotify(accountID)
if err != nil {
if _, err := addQueueTarget(queueTargets, accountID, queueTypeWebhook, newWebhookNotify); err != nil {
return nil, err
}
queueTargets[queueARN] = webhookLog
}
// Load elastic targets, initialize their respective loggers.
@ -652,25 +638,18 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
if !elasticN.Enable {
continue
}
// Construct the queue ARN for Elastic.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeElastic
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new ElasticSearch logrus instance.
elasticLog, err := newElasticNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeElastic, newElasticNotify); err != nil {
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
Op: "Connecting to " + queueARN, Net: "tcp",
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = elasticLog
}
// Load PostgreSQL targets, initialize their respective loggers.
@ -678,50 +657,37 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
if !pgN.Enable {
continue
}
// Construct the queue ARN for Postgres.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypePostgreSQL
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID initialize a new Postgresql logrus instance.
pgLog, err := newPostgreSQLNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypePostgreSQL, newPostgreSQLNotify); err != nil {
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
Op: "Connecting to " + queueARN, Net: "tcp",
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = pgLog
}
// Load Kafka targets, initialize their respective loggers.
for accountID, kafkaN := range serverConfig.Notify.GetKafka() {
if !kafkaN.Enable {
continue
}
// Construct the queue ARN for Kafka.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeKafka
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID initialize a new Kafka logrus instance.
kafkaLog, err := newKafkaNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if queueARN, err := addQueueTarget(queueTargets, accountID, queueTypeKafka, newKafkaNotify); err != nil {
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
Op: "Connecting to " + queueARN, Net: "tcp",
err = &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = kafkaLog
}
// Successfully initialized queue targets.

Loading…
Cancel
Save