diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 4c98244cc..aedd9cf70 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -101,7 +101,17 @@ var ( "1": { "enable": false, "brokers": null, - "topic": "" + "topic": "", + "tls" : { + "enable" : false, + "skipVerify" : false, + "clientAuth" : 0 + }, + "sasl" : { + "enable" : false, + "username" : "", + "password" : "" + } } }, "mqtt": { diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index a45634bc5..94d96a763 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -91,7 +91,17 @@ "1": { "enable": true, "brokers": ["localhost:9092"], - "topic": "bucketevents" + "topic": "bucketevents", + "tls" : { + "enable" : true, + "skipVerify" : false, + "clientAuth" : 0 + }, + "sasl" : { + "enable" : true, + "username" : "kafka", + "password" : "kafkapasswd" + } } }, "webhook": { diff --git a/pkg/event/target/kafka.go b/pkg/event/target/kafka.go index 11ebdedc6..e7fefb79a 100644 --- a/pkg/event/target/kafka.go +++ b/pkg/event/target/kafka.go @@ -17,6 +17,7 @@ package target import ( + "crypto/tls" "encoding/json" "errors" "net/url" @@ -32,6 +33,16 @@ type KafkaArgs struct { Enable bool `json:"enable"` Brokers []xnet.Host `json:"brokers"` Topic string `json:"topic"` + TLS struct { + Enable bool `json:"enable"` + SkipVerify bool `json:"skipVerify"` + ClientAuth tls.ClientAuthType `json:"clientAuth"` + } `json:"tls"` + SASL struct { + Enable bool `json:"enable"` + User string `json:"username"` + Password string `json:"password"` + } `json:"sasl"` } // Validate KafkaArgs fields @@ -90,9 +101,20 @@ func (target *KafkaTarget) Close() error { return target.producer.Close() } -// NewKafkaTarget - creates new Kafka target. +// NewKafkaTarget - creates new Kafka target with auth credentials. func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) { config := sarama.NewConfig() + + config.Net.SASL.User = args.SASL.User + config.Net.SASL.Password = args.SASL.Password + config.Net.SASL.Enable = args.SASL.Enable + + config.Net.TLS.Enable = args.TLS.Enable + tlsConfig := &tls.Config{ + ClientAuth: args.TLS.ClientAuth, + } + config.Net.TLS.Config = tlsConfig + config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 10 config.Producer.Return.Successes = true