fix: fetchLambdaInfo should return consistent results (#9332)

- Introduced a function `FetchRegisteredTargets` which will return
  a complete set of registered targets irrespective to their states,
  if the `returnOnTargetError` flag is set to `False`
- Refactor NewTarget functions to return non-nil targets
- Refactor GetARNList() to return a complete list of configured targets
master
Praveen raj Mani 5 years ago committed by GitHub
parent 525287f4b6
commit bfec5fe200
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      cmd/admin-handlers.go
  2. 155
      cmd/config/notify/parse.go
  3. 26
      pkg/event/target/amqp.go
  4. 37
      pkg/event/target/elasticsearch.go
  5. 45
      pkg/event/target/kafka.go
  6. 9
      pkg/event/target/mqtt.go
  7. 42
      pkg/event/target/mysql.go
  8. 89
      pkg/event/target/nats.go
  9. 43
      pkg/event/target/nsq.go
  10. 44
      pkg/event/target/postgresql.go
  11. 29
      pkg/event/target/redis.go
  12. 30
      pkg/event/target/webhook.go

@ -1453,11 +1453,13 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus {
// Fetch the targets
targetList, err := notify.RegisterNotificationTargets(cfg, GlobalServiceDoneCh, NewGatewayHTTPTransport(), nil, true)
if err != nil {
// Fetch the configured targets
targetList, err := notify.FetchRegisteredTargets(cfg, GlobalServiceDoneCh, NewGatewayHTTPTransport(), true, false)
if err != nil && err != notify.ErrTargetsOffline {
logger.LogIf(GlobalContext, err)
return nil
}
lambdaMap := make(map[string][]madmin.TargetIDStatus)
for targetID, target := range targetList.TargetMap() {

@ -17,8 +17,10 @@
package notify
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"net/http"
"strconv"
"strings"
@ -37,12 +39,16 @@ const (
formatAccess = "access"
)
// ErrTargetsOffline - Indicates single/multiple target failures.
var ErrTargetsOffline = errors.New("one or more targets are offline. Please use `mc admin info --json` to check the offline targets")
// TestNotificationTargets is similar to GetNotificationTargets()
// avoids explicit registration.
func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport,
targetIDs []event.TargetID) error {
test := true
targets, err := RegisterNotificationTargets(cfg, doneCh, transport, targetIDs, test)
returnOnTargetError := true
targets, err := RegisterNotificationTargets(cfg, doneCh, transport, targetIDs, test, returnOnTargetError)
if err == nil {
// Close all targets since we are only testing connections.
for _, t := range targets.TargetMap() {
@ -57,7 +63,8 @@ func TestNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transpor
// targets, returns error if any.
func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport) (*event.TargetList, error) {
test := false
return RegisterNotificationTargets(cfg, doneCh, transport, nil, test)
returnOnTargetError := false
return RegisterNotificationTargets(cfg, doneCh, transport, nil, test, returnOnTargetError)
}
// RegisterNotificationTargets - returns TargetList which contains enabled targets in serverConfig.
@ -65,8 +72,34 @@ func GetNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport
// * Add a new target in pkg/event/target package.
// * Add newly added target configuration to serverConfig.Notify.<TARGET_NAME>.
// * Handle the configuration in this function to create/add into TargetList.
func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID, test bool) (_ *event.TargetList, err error) {
func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, targetIDs []event.TargetID, test bool, returnOnTargetError bool) (*event.TargetList, error) {
targetList, err := FetchRegisteredTargets(cfg, doneCh, transport, test, returnOnTargetError)
if err != nil {
return targetList, err
}
if test {
// Verify if user is trying to disable already configured
// notification targets, based on their target IDs
for _, targetID := range targetIDs {
if !targetList.Exists(targetID) {
return nil, config.Errorf(
"Unable to disable configured targets '%v'",
targetID)
}
}
}
return targetList, nil
}
// FetchRegisteredTargets - Returns a set of configured TargetList
// If `returnOnTargetError` is set to true, The function returns when a target initialization fails
// Else, the function will return a complete TargetList irrespective of errors
func FetchRegisteredTargets(cfg config.Config, doneCh <-chan struct{}, transport *http.Transport, test bool, returnOnTargetError bool) (_ *event.TargetList, err error) {
targetList := event.NewTargetList()
var targetsOffline bool
defer func() {
// Automatically close all connections to targets when an error occur
@ -137,10 +170,17 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -150,11 +190,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -165,10 +210,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -179,10 +230,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
args.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -192,10 +249,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -205,10 +268,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -218,10 +287,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -231,10 +306,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -244,10 +325,16 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err = targetList.Add(newTarget); err != nil {
return nil, err
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
@ -257,23 +344,21 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
}
newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport, test)
if err != nil {
return nil, err
targetsOffline = true
if returnOnTargetError {
return nil, err
}
}
if err := targetList.Add(newTarget); err != nil {
return nil, err
if err = targetList.Add(newTarget); err != nil {
logger.LogIf(context.Background(), err)
if returnOnTargetError {
return nil, err
}
}
}
if test {
// Verify if user is trying to disable already configured
// notification targets, based on their target IDs
for _, targetID := range targetIDs {
if !targetList.Exists(targetID) {
return nil, config.Errorf(
"Unable to disable configured targets '%v'",
targetID)
}
}
if targetsOffline {
return targetList, ErrTargetsOffline
}
return targetList, nil

@ -275,35 +275,37 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce
var store Store
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
conn, err = amqp.Dial(args.URL.String())
if err != nil {
if store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
conn: conn,
store: store,
loggerOnce: loggerOnce,
}
target.conn = conn
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -82,10 +82,11 @@ func (a ElasticsearchArgs) Validate() error {
// ElasticsearchTarget - Elasticsearch target.
type ElasticsearchTarget struct {
id event.TargetID
args ElasticsearchArgs
client *elastic.Client
store Store
id event.TargetID
args ElasticsearchArgs
client *elastic.Client
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -252,38 +253,42 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str
var store Store
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-elasticsearch-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
dErr := args.URL.DialHTTP(nil)
if dErr != nil {
if store == nil {
return nil, dErr
target.loggerOnce(context.Background(), dErr, target.ID())
return target, dErr
}
} else {
client, err = newClient(args)
if err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
target := &ElasticsearchTarget{
id: event.TargetID{ID: id, Name: "elasticsearch"},
args: args,
client: client,
store: store,
target.client = client
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -123,11 +123,12 @@ func (k KafkaArgs) Validate() error {
// KafkaTarget - Kafka target.
type KafkaTarget struct {
id event.TargetID
args KafkaArgs
producer sarama.SyncProducer
config *sarama.Config
store Store
id event.TargetID
args KafkaArgs
producer sarama.SyncProducer
config *sarama.Config
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -248,10 +249,17 @@ func (k KafkaArgs) pingBrokers() bool {
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) {
config := sarama.NewConfig()
target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
loggerOnce: loggerOnce,
}
if args.Version != "" {
kafkaVersion, err := sarama.ParseKafkaVersion(args.Version)
if err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
config.Version = kafkaVersion
}
@ -273,7 +281,8 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
tlsConfig, err := saramatls.NewConfig(args.TLS.ClientTLSCert, args.TLS.ClientTLSKey)
if err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
config.Net.TLS.Enable = args.TLS.Enable
@ -286,6 +295,8 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
target.config = config
brokers := []string{}
for _, broker := range args.Brokers {
brokers = append(brokers, broker.String())
@ -297,30 +308,26 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
queueDir := filepath.Join(args.QueueDir, storePrefix+"-kafka-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
if store == nil || err != sarama.ErrOutOfBrokers {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
target := &KafkaTarget{
id: event.TargetID{ID: id, Name: "kafka"},
args: args,
producer: producer,
config: config,
store: store,
}
target.producer = producer
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -258,19 +258,20 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mqtt-"+id)
target.store = NewQueueStore(queueDir, args.QueueLimit)
if err := target.store.Open(); err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
if !test {
go retryRegister()
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
} else {
if token.Wait() && token.Error() != nil {
return nil, token.Error()
return target, token.Error()
}
}
return target, nil

@ -178,6 +178,7 @@ type MySQLTarget struct {
db *sql.DB
store Store
firstPing bool
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -187,6 +188,13 @@ func (target *MySQLTarget) ID() event.TargetID {
// IsActive - Return true if target is up and active
func (target *MySQLTarget) IsActive() (bool, error) {
if target.db == nil {
db, sErr := sql.Open("mysql", target.args.DSN)
if sErr != nil {
return false, sErr
}
target.db = db
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
@ -346,7 +354,6 @@ func (target *MySQLTarget) executeStmts() error {
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) {
var firstPing bool
if args.DSN == "" {
config := mysql.Config{
User: args.User,
@ -360,10 +367,19 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
args.DSN = config.FormatDSN()
}
target := &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
firstPing: false,
loggerOnce: loggerOnce,
}
db, err := sql.Open("mysql", args.DSN)
if err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.db = db
var store Store
@ -371,35 +387,31 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
queueDir := filepath.Join(args.QueueDir, storePrefix+"-mysql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
}
target := &MySQLTarget{
id: event.TargetID{ID: id, Name: "mysql"},
args: args,
db: db,
store: store,
firstPing: firstPing,
target.store = store
}
err = target.db.Ping()
if err != nil {
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
} else {
if err = target.executeStmts(); err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -197,11 +197,12 @@ func (n NATSArgs) connectStan() (stan.Conn, error) {
// NATSTarget - NATS target.
type NATSTarget struct {
id event.TargetID
args NATSArgs
natsConn *nats.Conn
stanConn stan.Conn
store Store
id event.TargetID
args NATSArgs
natsConn *nats.Conn
stanConn stan.Conn
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -211,15 +212,32 @@ func (target *NATSTarget) ID() event.TargetID {
// IsActive - Return true if target is up and active
func (target *NATSTarget) IsActive() (bool, error) {
var connErr error
if target.args.Streaming.Enable {
if !target.stanConn.NatsConn().IsConnected() {
return false, errNotConnected
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
target.stanConn, connErr = target.args.connectStan()
} else {
if !target.stanConn.NatsConn().IsConnected() {
return false, errNotConnected
}
}
} else {
if !target.natsConn.IsConnected() {
if target.natsConn == nil {
target.natsConn, connErr = target.args.connectNats()
} else {
if !target.natsConn.IsConnected() {
return false, errNotConnected
}
}
}
if connErr != nil {
if connErr.Error() == nats.ErrNoServers.Error() {
return false, errNotConnected
}
return false, connErr
}
return true, nil
}
@ -262,31 +280,9 @@ func (target *NATSTarget) send(eventData event.Event) error {
// Send - sends event to Nats.
func (target *NATSTarget) Send(eventKey string) error {
var connErr error
if target.args.Streaming.Enable {
if target.stanConn == nil || target.stanConn.NatsConn() == nil {
target.stanConn, connErr = target.args.connectStan()
} else {
if !target.stanConn.NatsConn().IsConnected() {
return errNotConnected
}
}
} else {
if target.natsConn == nil {
target.natsConn, connErr = target.args.connectNats()
} else {
if !target.natsConn.IsConnected() {
return errNotConnected
}
}
}
if connErr != nil {
if connErr.Error() == nats.ErrNoServers.Error() {
return errNotConnected
}
return connErr
_, err := target.IsActive()
if err != nil {
return err
}
eventData, eErr := target.store.Get(eventKey)
@ -332,39 +328,42 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
var store Store
target := &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"},
args: args,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
if args.Streaming.Enable {
stanConn, err = args.connectStan()
target.stanConn = stanConn
} else {
natsConn, err = args.connectNats()
target.natsConn = natsConn
}
if err != nil {
if store == nil || err.Error() != nats.ErrNoServers.Error() {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
target := &NATSTarget{
id: event.TargetID{ID: id, Name: "nats"},
args: args,
stanConn: stanConn,
natsConn: natsConn,
store: store,
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -89,10 +89,12 @@ func (n NSQArgs) Validate() error {
// NSQTarget - NSQ target.
type NSQTarget struct {
id event.TargetID
args NSQArgs
producer *nsq.Producer
store Store
id event.TargetID
args NSQArgs
producer *nsq.Producer
store Store
config *nsq.Config
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -102,6 +104,14 @@ func (target *NSQTarget) ID() event.TargetID {
// IsActive - Return true if target is up and active
func (target *NSQTarget) IsActive() (bool, error) {
if target.producer != nil {
producer, err := nsq.NewProducer(target.args.NSQDAddress.String(), target.config)
if err != nil {
return false, err
}
target.producer = producer
}
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if IsConnRefusedErr(err) {
@ -186,38 +196,43 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu
var store Store
target := &NSQTarget{
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
config: config,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-nsq-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return nil, oErr
}
target.store = store
}
producer, err := nsq.NewProducer(args.NSQDAddress.String(), config)
if err != nil {
return nil, err
}
target := &NSQTarget{
id: event.TargetID{ID: id, Name: "nsq"},
args: args,
producer: producer,
store: store,
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.producer = producer
if err := target.producer.Ping(); err != nil {
// To treat "connection refused" errors as errNotConnected.
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
target.loggerOnce(context.Background(), err, target.ID())
return nil, err
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -176,6 +176,8 @@ type PostgreSQLTarget struct {
db *sql.DB
store Store
firstPing bool
connString string
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -185,6 +187,13 @@ func (target *PostgreSQLTarget) ID() event.TargetID {
// IsActive - Return true if target is up and active
func (target *PostgreSQLTarget) IsActive() (bool, error) {
if target.db == nil {
db, err := sql.Open("postgres", target.connString)
if err != nil {
return false, err
}
target.db = db
}
if err := target.db.Ping(); err != nil {
if IsConnErr(err) {
return false, errNotConnected
@ -345,8 +354,6 @@ func (target *PostgreSQLTarget) executeStmts() error {
// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) {
var firstPing bool
params := []string{args.ConnectionString}
if !args.Host.IsEmpty() {
params = append(params, "host="+args.Host.String())
@ -365,10 +372,19 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{},
}
connStr := strings.Join(params, " ")
target := &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
firstPing: false,
connString: connStr,
loggerOnce: loggerOnce,
}
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
return target, err
}
target.db = db
var store Store
@ -376,35 +392,31 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{},
queueDir := filepath.Join(args.QueueDir, storePrefix+"-postgresql-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
}
target := &PostgreSQLTarget{
id: event.TargetID{ID: id, Name: "postgresql"},
args: args,
db: db,
store: store,
firstPing: firstPing,
target.store = store
}
err = target.db.Ping()
if err != nil {
if target.store == nil || !(IsConnRefusedErr(err) || IsConnResetErr(err)) {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
} else {
if err = target.executeStmts(); err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -286,22 +286,23 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
var store Store
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
return nil, oErr
}
}
target := &RedisTarget{
id: event.TargetID{ID: id, Name: "redis"},
args: args,
pool: pool,
store: store,
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-redis-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if oErr := store.Open(); oErr != nil {
target.loggerOnce(context.Background(), oErr, target.ID())
return target, oErr
}
target.store = store
}
conn := target.pool.Get()
defer func() {
cErr := conn.Close()
@ -311,20 +312,22 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
_, pingErr := conn.Do("PING")
if pingErr != nil {
if target.store == nil || !(IsConnRefusedErr(pingErr) || IsConnResetErr(pingErr)) {
return nil, pingErr
target.loggerOnce(context.Background(), pingErr, target.ID())
return target, pingErr
}
} else {
if err := target.args.validateFormat(conn); err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.firstPing = true
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

@ -82,6 +82,7 @@ type WebhookTarget struct {
args WebhookArgs
httpClient *http.Client
store Store
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
}
// ID - returns target ID.
@ -195,28 +196,37 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge
var store Store
target := &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
httpClient: &http.Client{
Transport: transport,
},
loggerOnce: loggerOnce,
}
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-webhook-"+id)
store = NewQueueStore(queueDir, args.QueueLimit)
if err := store.Open(); err != nil {
return nil, err
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
target.store = store
}
target := &WebhookTarget{
id: event.TargetID{ID: id, Name: "webhook"},
args: args,
httpClient: &http.Client{
Transport: transport,
},
store: store,
if _, err := target.IsActive(); err != nil {
if target.store == nil || err != errNotConnected {
target.loggerOnce(context.Background(), err, target.ID())
return target, err
}
}
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
go sendEvents(target, eventKeyCh, doneCh, target.loggerOnce)
}
return target, nil

Loading…
Cancel
Save