@ -20,7 +20,10 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"net"
"net/url"
"os"
"path/filepath"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
@ -33,6 +36,8 @@ type KafkaArgs struct {
Enable bool ` json:"enable" `
Brokers [ ] xnet . Host ` json:"brokers" `
Topic string ` json:"topic" `
QueueDir string ` json:"queueDir" `
QueueLimit uint16 ` json:"queueLimit" `
TLS struct {
Enable bool ` json:"enable" `
SkipVerify bool ` json:"skipVerify" `
@ -58,6 +63,14 @@ func (k KafkaArgs) Validate() error {
return err
}
}
if k . QueueDir != "" {
if ! filepath . IsAbs ( k . QueueDir ) {
return errors . New ( "queueDir path should be absolute" )
}
}
if k . QueueLimit > 10000 {
return errors . New ( "queueLimit should not exceed 10000" )
}
return nil
}
@ -66,6 +79,8 @@ type KafkaTarget struct {
id event . TargetID
args KafkaArgs
producer sarama . SyncProducer
config * sarama . Config
store Store
}
// ID - returns target ID.
@ -73,11 +88,18 @@ func (target *KafkaTarget) ID() event.TargetID {
return target . id
}
// Save - Sends event directly without persisting .
// Save - saves the events to the store which will be replayed when the Kafka connection is active .
func ( target * KafkaTarget ) Save ( eventData event . Event ) error {
if target . store != nil {
return target . store . Put ( eventData )
}
if ! target . args . pingBrokers ( ) {
return errNotConnected
}
return target . send ( eventData )
}
// send - sends an event to the kafka.
func ( target * KafkaTarget ) send ( eventData event . Event ) error {
objectName , err := url . QueryUnescape ( eventData . S3 . Object . Key )
if err != nil {
@ -95,23 +117,79 @@ func (target *KafkaTarget) send(eventData event.Event) error {
Key : sarama . StringEncoder ( key ) ,
Value : sarama . ByteEncoder ( data ) ,
}
_ , _ , err = target . producer . SendMessage ( & msg )
return err
}
// Send - interface compatible method does no-op .
// Send - reads an event from store and sends it to Kafka .
func ( target * KafkaTarget ) Send ( eventKey string ) error {
var err error
if ! target . args . pingBrokers ( ) {
return errNotConnected
}
if target . producer == nil {
brokers := [ ] string { }
for _ , broker := range target . args . Brokers {
brokers = append ( brokers , broker . String ( ) )
}
target . producer , err = sarama . NewSyncProducer ( brokers , target . config )
if err != nil {
if err != sarama . ErrOutOfBrokers {
return err
}
return errNotConnected
}
}
eventData , eErr := target . store . Get ( eventKey )
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os . IsNotExist ( eErr ) {
return nil
}
return eErr
}
err = target . send ( eventData )
if err != nil {
// Sarama opens the ciruit breaker after 3 consecutive connection failures.
if err == sarama . ErrLeaderNotAvailable || err . Error ( ) == "circuit breaker is open" {
return errNotConnected
}
return err
}
// Delete the event from store.
return target . store . Del ( eventKey )
}
// Close - closes underneath kafka connection.
func ( target * KafkaTarget ) Close ( ) error {
if target . producer != nil {
return target . producer . Close ( )
}
return nil
}
// Check if atleast one broker in cluster is active
func ( k KafkaArgs ) pingBrokers ( ) bool {
for _ , broker := range k . Brokers {
_ , dErr := net . Dial ( "tcp" , broker . String ( ) )
if dErr == nil {
return true
}
}
return false
}
// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget ( id string , args KafkaArgs ) ( * KafkaTarget , error ) {
func NewKafkaTarget ( id string , args KafkaArgs , doneCh <- chan struct { } ) ( * KafkaTarget , error ) {
config := sarama . NewConfig ( )
config . Net . SASL . User = args . SASL . User
@ -132,14 +210,38 @@ func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) {
for _ , broker := range args . Brokers {
brokers = append ( brokers , broker . String ( ) )
}
var store Store
if args . QueueDir != "" {
queueDir := filepath . Join ( args . QueueDir , storePrefix + "-kafka-" + id )
store = NewQueueStore ( queueDir , args . QueueLimit )
if oErr := store . Open ( ) ; oErr != nil {
return nil , oErr
}
}
producer , err := sarama . NewSyncProducer ( brokers , config )
if err != nil {
if store == nil || err != sarama . ErrOutOfBrokers {
return nil , err
}
}
return & KafkaTarget {
target := & KafkaTarget {
id : event . TargetID { ID : id , Name : "kafka" } ,
args : args ,
producer : producer ,
} , nil
config : config ,
store : store ,
}
if target . store != nil {
// Replays the events from the store.
eventKeyCh := replayEvents ( target . store , doneCh )
// Start replaying events from the store.
go sendEvents ( target , eventKeyCh , doneCh )
}
return target , nil
}