|
|
|
@ -17,6 +17,7 @@ |
|
|
|
|
package target |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"crypto/tls" |
|
|
|
|
"encoding/json" |
|
|
|
|
"errors" |
|
|
|
|
"net/url" |
|
|
|
@ -32,6 +33,16 @@ type KafkaArgs struct { |
|
|
|
|
Enable bool `json:"enable"` |
|
|
|
|
Brokers []xnet.Host `json:"brokers"` |
|
|
|
|
Topic string `json:"topic"` |
|
|
|
|
TLS struct { |
|
|
|
|
Enable bool `json:"enable"` |
|
|
|
|
SkipVerify bool `json:"skipVerify"` |
|
|
|
|
ClientAuth tls.ClientAuthType `json:"clientAuth"` |
|
|
|
|
} `json:"tls"` |
|
|
|
|
SASL struct { |
|
|
|
|
Enable bool `json:"enable"` |
|
|
|
|
User string `json:"username"` |
|
|
|
|
Password string `json:"password"` |
|
|
|
|
} `json:"sasl"` |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Validate KafkaArgs fields
|
|
|
|
@ -90,9 +101,20 @@ func (target *KafkaTarget) Close() error { |
|
|
|
|
return target.producer.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewKafkaTarget - creates new Kafka target.
|
|
|
|
|
// NewKafkaTarget - creates new Kafka target with auth credentials.
|
|
|
|
|
func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) { |
|
|
|
|
config := sarama.NewConfig() |
|
|
|
|
|
|
|
|
|
config.Net.SASL.User = args.SASL.User |
|
|
|
|
config.Net.SASL.Password = args.SASL.Password |
|
|
|
|
config.Net.SASL.Enable = args.SASL.Enable |
|
|
|
|
|
|
|
|
|
config.Net.TLS.Enable = args.TLS.Enable |
|
|
|
|
tlsConfig := &tls.Config{ |
|
|
|
|
ClientAuth: args.TLS.ClientAuth, |
|
|
|
|
} |
|
|
|
|
config.Net.TLS.Config = tlsConfig |
|
|
|
|
|
|
|
|
|
config.Producer.RequiredAcks = sarama.WaitForAll |
|
|
|
|
config.Producer.Retry.Max = 10 |
|
|
|
|
config.Producer.Return.Successes = true |
|
|
|
|