Fix build issues with event target logger

master
Harshavardhana 5 years ago
parent d7060c4c32
commit c33bae057f
  1. 2
      pkg/event/target/elasticsearch.go
  2. 2
      pkg/event/target/kafka.go
  3. 4
      pkg/event/target/mqtt.go
  4. 2
      pkg/event/target/mysql.go
  5. 2
      pkg/event/target/nats.go
  6. 2
      pkg/event/target/nsq.go
  7. 2
      pkg/event/target/postgresql.go
  8. 4
      pkg/event/target/store.go
  9. 2
      pkg/event/target/webhook.go

@ -221,7 +221,7 @@ func newClient(args ElasticsearchArgs) (*elastic.Client, error) {
} }
// NewElasticsearchTarget - creates new Elasticsearch target. // NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*ElasticsearchTarget, error) { func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*ElasticsearchTarget, error) {
var client *elastic.Client var client *elastic.Client
var err error var err error

@ -192,7 +192,7 @@ func (k KafkaArgs) pingBrokers() bool {
} }
// NewKafkaTarget - creates new Kafka target with auth credentials. // NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*KafkaTarget, error) { func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*KafkaTarget, error) {
config := sarama.NewConfig() config := sarama.NewConfig()
config.Net.SASL.User = args.SASL.User config.Net.SASL.User = args.SASL.User

@ -88,7 +88,7 @@ type MQTTTarget struct {
args MQTTArgs args MQTTArgs
client mqtt.Client client mqtt.Client
store Store store Store
loggerOnce func(ctx context.Context, err error, id interface{}) loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})
} }
// ID - returns target ID. // ID - returns target ID.
@ -162,7 +162,7 @@ func (target *MQTTTarget) Close() error {
} }
// NewMQTTTarget - creates new MQTT target. // NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*MQTTTarget, error) { func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MQTTTarget, error) {
options := mqtt.NewClientOptions(). options := mqtt.NewClientOptions().
SetClientID(""). SetClientID("").
SetCleanSession(true). SetCleanSession(true).

@ -312,7 +312,7 @@ func (target *MySQLTarget) executeStmts() error {
} }
// NewMySQLTarget - creates new MySQL target. // NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*MySQLTarget, error) { func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MySQLTarget, error) {
var firstPing bool var firstPing bool
if args.DSN == "" { if args.DSN == "" {
config := mysql.Config{ config := mysql.Config{

@ -234,7 +234,7 @@ func (target *NATSTarget) Close() (err error) {
} }
// NewNATSTarget - creates new NATS target. // NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*NATSTarget, error) { func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*NATSTarget, error) {
var natsConn *nats.Conn var natsConn *nats.Conn
var stanConn stan.Conn var stanConn stan.Conn

@ -150,7 +150,7 @@ func (target *NSQTarget) Close() (err error) {
} }
// NewNSQTarget - creates new NSQ target. // NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*NSQTarget, error) { func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*NSQTarget, error) {
config := nsq.NewConfig() config := nsq.NewConfig()
if args.TLS.Enable { if args.TLS.Enable {
config.TlsV1 = true config.TlsV1 = true

@ -313,7 +313,7 @@ func (target *PostgreSQLTarget) executeStmts() error {
} }
// NewPostgreSQLTarget - creates new PostgreSQL target. // NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) (*PostgreSQLTarget, error) { func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*PostgreSQLTarget, error) {
var firstPing bool var firstPing bool
params := []string{args.ConnectionString} params := []string{args.ConnectionString}

@ -45,7 +45,7 @@ type Store interface {
} }
// replayEvents - Reads the events from the store and replays. // replayEvents - Reads the events from the store and replays.
func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}), id event.TargetID) <-chan string { func replayEvents(store Store, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), id event.TargetID) <-chan string {
eventKeyCh := make(chan string) eventKeyCh := make(chan string)
go func() { go func() {
@ -97,7 +97,7 @@ func IsConnResetErr(err error) bool {
} }
// sendEvents - Reads events from the store and re-plays. // sendEvents - Reads events from the store and re-plays.
func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) { func sendEvents(target event.Target, eventKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) {
retryTicker := time.NewTicker(retryInterval) retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop() defer retryTicker.Stop()

@ -172,7 +172,7 @@ func (target *WebhookTarget) Close() error {
} }
// NewWebhookTarget - creates new Webhook target. // NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{})) *WebhookTarget { func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) *WebhookTarget {
var store Store var store Store

Loading…
Cancel
Save