/* * MinIO Cloud Storage, (C) 2018 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package target import ( "crypto/tls" "encoding/json" "errors" "net/url" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" sarama "gopkg.in/Shopify/sarama.v1" ) // KafkaArgs - Kafka target arguments. type KafkaArgs struct { Enable bool `json:"enable"` Brokers []xnet.Host `json:"brokers"` Topic string `json:"topic"` TLS struct { Enable bool `json:"enable"` SkipVerify bool `json:"skipVerify"` ClientAuth tls.ClientAuthType `json:"clientAuth"` } `json:"tls"` SASL struct { Enable bool `json:"enable"` User string `json:"username"` Password string `json:"password"` } `json:"sasl"` } // Validate KafkaArgs fields func (k KafkaArgs) Validate() error { if !k.Enable { return nil } if len(k.Brokers) == 0 { return errors.New("no broker address found") } for _, b := range k.Brokers { if _, err := xnet.ParseHost(b.String()); err != nil { return err } } return nil } // KafkaTarget - Kafka target. type KafkaTarget struct { id event.TargetID args KafkaArgs producer sarama.SyncProducer } // ID - returns target ID. func (target *KafkaTarget) ID() event.TargetID { return target.id } // Save - Sends event directly without persisting. func (target *KafkaTarget) Save(eventData event.Event) error { return target.send(eventData) } func (target *KafkaTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err } key := eventData.S3.Bucket.Name + "/" + objectName data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}}) if err != nil { return err } msg := sarama.ProducerMessage{ Topic: target.args.Topic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(data), } _, _, err = target.producer.SendMessage(&msg) return err } // Send - interface compatible method does no-op. func (target *KafkaTarget) Send(eventKey string) error { return nil } // Close - closes underneath kafka connection. func (target *KafkaTarget) Close() error { return target.producer.Close() } // NewKafkaTarget - creates new Kafka target with auth credentials. func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) { config := sarama.NewConfig() config.Net.SASL.User = args.SASL.User config.Net.SASL.Password = args.SASL.Password config.Net.SASL.Enable = args.SASL.Enable config.Net.TLS.Enable = args.TLS.Enable tlsConfig := &tls.Config{ ClientAuth: args.TLS.ClientAuth, } config.Net.TLS.Config = tlsConfig config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 10 config.Producer.Return.Successes = true brokers := []string{} for _, broker := range args.Brokers { brokers = append(brokers, broker.String()) } producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return nil, err } return &KafkaTarget{ id: event.TargetID{ID: id, Name: "kafka"}, args: args, producer: producer, }, nil }