package sarama import ( "fmt" "sync" "time" "github.com/eapache/go-resiliency/breaker" "github.com/eapache/queue" ) // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the // producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid // leaks: it will not be garbage-collected automatically when it passes out of // scope. type AsyncProducer interface { // AsyncClose triggers a shutdown of the producer, flushing any messages it may // have buffered. The shutdown has completed when both the Errors and Successes // channels have been closed. When calling AsyncClose, you *must* continue to // read from those channels in order to drain the results of any messages in // flight. AsyncClose() // Close shuts down the producer and flushes any messages it may have buffered. // You must call this function before a producer object passes out of scope, as // it may otherwise leak memory. You must call this before calling Close on the // underlying client. Close() error // Input is the input channel for the user to write messages to that they // wish to send. Input() chan<- *ProducerMessage // Successes is the success output channel back to the user when AckSuccesses is // enabled. If Return.Successes is true, you MUST read from this channel or the // Producer will deadlock. It is suggested that you send and read messages // together in a single select statement. Successes() <-chan *ProducerMessage // Errors is the error output channel back to the user. You MUST read from this // channel or the Producer will deadlock when the channel is full. Alternatively, // you can set Producer.Return.Errors in your config to false, which prevents // errors to be returned. Errors() <-chan *ProducerError } type asyncProducer struct { client Client conf *Config ownClient bool errors chan *ProducerError input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup brokers map[*Broker]chan<- *ProducerMessage brokerRefs map[chan<- *ProducerMessage]int brokerLock sync.Mutex } // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration. func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) { client, err := NewClient(addrs, conf) if err != nil { return nil, err } p, err := NewAsyncProducerFromClient(client) if err != nil { return nil, err } p.(*asyncProducer).ownClient = true return p, nil } // NewAsyncProducerFromClient creates a new Producer using the given client. It is still // necessary to call Close() on the underlying client when shutting down this producer. func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { // Check that we are not dealing with a closed Client before processing any other arguments if client.Closed() { return nil, ErrClosedClient } p := &asyncProducer{ client: client, conf: client.Config(), errors: make(chan *ProducerError), input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), brokers: make(map[*Broker]chan<- *ProducerMessage), brokerRefs: make(map[chan<- *ProducerMessage]int), } // launch our singleton dispatchers go withRecover(p.dispatcher) go withRecover(p.retryHandler) return p, nil } type flagSet int8 const ( syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer fin // final message from partitionProducer to brokerProducer and back shutdown // start the shutdown process ) // ProducerMessage is the collection of elements passed to the Producer in order to send a message. type ProducerMessage struct { Topic string // The Kafka topic for this message. // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key Encoder // The actual message to store in Kafka. Pre-existing Encoders include // StringEncoder and ByteEncoder. Value Encoder // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} // Below this point are filled in by the producer as the message is processed // Offset is the offset of the message stored on the broker. This is only // guaranteed to be defined if the message was successfully delivered and // RequiredAcks is not NoResponse. Offset int64 // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 // Timestamp is the timestamp assigned to the message by the broker. This // is only guaranteed to be defined if the message was successfully // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at // least version 0.10.0. Timestamp time.Time retries int flags flagSet } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. func (m *ProducerMessage) byteSize() int { size := producerMessageOverhead if m.Key != nil { size += m.Key.Length() } if m.Value != nil { size += m.Value.Length() } return size } func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 } // ProducerError is the type of error generated when the producer fails to deliver a message. // It contains the original ProducerMessage as well as the actual error value. type ProducerError struct { Msg *ProducerMessage Err error } func (pe ProducerError) Error() string { return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err) } // ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. // It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel // when closing a producer. type ProducerErrors []*ProducerError func (pe ProducerErrors) Error() string { return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe)) } func (p *asyncProducer) Errors() <-chan *ProducerError { return p.errors } func (p *asyncProducer) Successes() <-chan *ProducerMessage { return p.successes } func (p *asyncProducer) Input() chan<- *ProducerMessage { return p.input } func (p *asyncProducer) Close() error { p.AsyncClose() if p.conf.Producer.Return.Successes { go withRecover(func() { for _ = range p.successes { } }) } var errors ProducerErrors if p.conf.Producer.Return.Errors { for event := range p.errors { errors = append(errors, event) } } if len(errors) > 0 { return errors } return nil } func (p *asyncProducer) AsyncClose() { go withRecover(p.shutdown) } // singleton // dispatches messages by topic func (p *asyncProducer) dispatcher() { handlers := make(map[string]chan<- *ProducerMessage) shuttingDown := false for msg := range p.input { if msg == nil { Logger.Println("Something tried to send a nil message, it was ignored.") continue } if msg.flags&shutdown != 0 { shuttingDown = true p.inFlight.Done() continue } else if msg.retries == 0 { if shuttingDown { // we can't just call returnError here because that decrements the wait group, // which hasn't been incremented yet for this message, and shouldn't be pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown} if p.conf.Producer.Return.Errors { p.errors <- pErr } else { Logger.Println(pErr) } continue } p.inFlight.Add(1) } if msg.byteSize() > p.conf.Producer.MaxMessageBytes { p.returnError(msg, ErrMessageSizeTooLarge) continue } handler := handlers[msg.Topic] if handler == nil { handler = p.newTopicProducer(msg.Topic) handlers[msg.Topic] = handler } handler <- msg } for _, handler := range handlers { close(handler) } } // one per topic // partitions messages, then dispatches them by partition type topicProducer struct { parent *asyncProducer topic string input <-chan *ProducerMessage breaker *breaker.Breaker handlers map[int32]chan<- *ProducerMessage partitioner Partitioner } func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage { input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) tp := &topicProducer{ parent: p, topic: topic, input: input, breaker: breaker.New(3, 1, 10*time.Second), handlers: make(map[int32]chan<- *ProducerMessage), partitioner: p.conf.Producer.Partitioner(topic), } go withRecover(tp.dispatch) return input } func (tp *topicProducer) dispatch() { for msg := range tp.input { if msg.retries == 0 { if err := tp.partitionMessage(msg); err != nil { tp.parent.returnError(msg, err) continue } } handler := tp.handlers[msg.Partition] if handler == nil { handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition) tp.handlers[msg.Partition] = handler } handler <- msg } for _, handler := range tp.handlers { close(handler) } } func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { var partitions []int32 err := tp.breaker.Run(func() (err error) { if tp.partitioner.RequiresConsistency() { partitions, err = tp.parent.client.Partitions(msg.Topic) } else { partitions, err = tp.parent.client.WritablePartitions(msg.Topic) } return }) if err != nil { return err } numPartitions := int32(len(partitions)) if numPartitions == 0 { return ErrLeaderNotAvailable } choice, err := tp.partitioner.Partition(msg, numPartitions) if err != nil { return err } else if choice < 0 || choice >= numPartitions { return ErrInvalidPartition } msg.Partition = partitions[choice] return nil } // one per partition per topic // dispatches messages to the appropriate broker // also responsible for maintaining message order during retries type partitionProducer struct { parent *asyncProducer topic string partition int32 input <-chan *ProducerMessage leader *Broker breaker *breaker.Breaker output chan<- *ProducerMessage // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through, // all other messages get buffered in retryState[msg.retries].buf to preserve ordering // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and // therefore whether our buffer is complete and safe to flush) highWatermark int retryState []partitionRetryState } type partitionRetryState struct { buf []*ProducerMessage expectChaser bool } func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage { input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) pp := &partitionProducer{ parent: p, topic: topic, partition: partition, input: input, breaker: breaker.New(3, 1, 10*time.Second), retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1), } go withRecover(pp.dispatch) return input } func (pp *partitionProducer) dispatch() { // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader` // on the first message pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition) if pp.leader != nil { pp.output = pp.parent.getBrokerProducer(pp.leader) pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} } for msg := range pp.input { if msg.retries > pp.highWatermark { // a new, higher, retry level; handle it and then back off pp.newHighWatermark(msg.retries) time.Sleep(pp.parent.conf.Producer.Retry.Backoff) } else if pp.highWatermark > 0 { // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level if msg.retries < pp.highWatermark { // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin) if msg.flags&fin == fin { pp.retryState[msg.retries].expectChaser = false pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected } else { pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg) } continue } else if msg.flags&fin == fin { // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set, // meaning this retry level is done and we can go down (at least) one level and flush that pp.retryState[pp.highWatermark].expectChaser = false pp.flushRetryBuffers() pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected continue } } // if we made it this far then the current msg contains real data, and can be sent to the next goroutine // without breaking any of our ordering guarantees if pp.output == nil { if err := pp.updateLeader(); err != nil { pp.parent.returnError(msg, err) time.Sleep(pp.parent.conf.Producer.Retry.Backoff) continue } Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } pp.output <- msg } if pp.output != nil { pp.parent.unrefBrokerProducer(pp.leader, pp.output) } } func (pp *partitionProducer) newHighWatermark(hwm int) { Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm) pp.highWatermark = hwm // send off a fin so that we know when everything "in between" has made it // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) pp.retryState[pp.highWatermark].expectChaser = true pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} // a new HWM means that our current broker selection is out of date Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) pp.parent.unrefBrokerProducer(pp.leader, pp.output) pp.output = nil } func (pp *partitionProducer) flushRetryBuffers() { Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark) for { pp.highWatermark-- if pp.output == nil { if err := pp.updateLeader(); err != nil { pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err) goto flushDone } Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } for _, msg := range pp.retryState[pp.highWatermark].buf { pp.output <- msg } flushDone: pp.retryState[pp.highWatermark].buf = nil if pp.retryState[pp.highWatermark].expectChaser { Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark) break } else if pp.highWatermark == 0 { Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition) break } } } func (pp *partitionProducer) updateLeader() error { return pp.breaker.Run(func() (err error) { if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil { return err } if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil { return err } pp.output = pp.parent.getBrokerProducer(pp.leader) pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} return nil }) } // one per broker; also constructs an associated flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { var ( input = make(chan *ProducerMessage) bridge = make(chan *produceSet) responses = make(chan *brokerProducerResponse) ) bp := &brokerProducer{ parent: p, broker: broker, input: input, output: bridge, responses: responses, buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } go withRecover(bp.run) // minimal bridge to make the network response `select`able go withRecover(func() { for set := range bridge { request := set.buildRequest() response, err := broker.Produce(request) responses <- &brokerProducerResponse{ set: set, err: err, res: response, } } close(responses) }) return input } type brokerProducerResponse struct { set *produceSet err error res *ProduceResponse } // groups messages together into appropriately-sized batches for sending to the broker // handles state related to retries etc type brokerProducer struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage output chan<- *produceSet responses <-chan *brokerProducerResponse buffer *produceSet timer <-chan time.Time timerFired bool closing error currentRetries map[string]map[int32]error } func (bp *brokerProducer) run() { var output chan<- *produceSet Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { select { case msg := <-bp.input: if msg == nil { bp.shutdown() return } if msg.flags&syn == syn { Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n", bp.broker.ID(), msg.Topic, msg.Partition) if bp.currentRetries[msg.Topic] == nil { bp.currentRetries[msg.Topic] = make(map[int32]error) } bp.currentRetries[msg.Topic][msg.Partition] = nil bp.parent.inFlight.Done() continue } if reason := bp.needsRetry(msg); reason != nil { bp.parent.retryMessage(msg, reason) if bp.closing == nil && msg.flags&fin == fin { // we were retrying this partition but we can start processing again delete(bp.currentRetries[msg.Topic], msg.Partition) Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n", bp.broker.ID(), msg.Topic, msg.Partition) } continue } if bp.buffer.wouldOverflow(msg) { if err := bp.waitForSpace(msg); err != nil { bp.parent.retryMessage(msg, err) continue } } if err := bp.buffer.add(msg); err != nil { bp.parent.returnError(msg, err) continue } if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) } case <-bp.timer: bp.timerFired = true case output <- bp.buffer: bp.rollOver() case response := <-bp.responses: bp.handleResponse(response) } if bp.timerFired || bp.buffer.readyToFlush() { output = bp.output } else { output = nil } } } func (bp *brokerProducer) shutdown() { for !bp.buffer.empty() { select { case response := <-bp.responses: bp.handleResponse(response) case bp.output <- bp.buffer: bp.rollOver() } } close(bp.output) for response := range bp.responses { bp.handleResponse(response) } Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { if bp.closing != nil { return bp.closing } return bp.currentRetries[msg.Topic][msg.Partition] } func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) for { select { case response := <-bp.responses: bp.handleResponse(response) // handling a response can change our state, so re-check some things if reason := bp.needsRetry(msg); reason != nil { return reason } else if !bp.buffer.wouldOverflow(msg) { return nil } case bp.output <- bp.buffer: bp.rollOver() return nil } } } func (bp *brokerProducer) rollOver() { bp.timer = nil bp.timerFired = false bp.buffer = newProduceSet(bp.parent) } func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { if response.err != nil { bp.handleError(response.set, response.err) } else { bp.handleSuccess(response.set, response.res) } if bp.buffer.empty() { bp.rollOver() // this can happen if the response invalidated our buffer } } func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success bp.parent.returnSuccesses(msgs) return } block := response.GetBlock(topic, partition) if block == nil { bp.parent.returnErrors(msgs, ErrIncompleteResponse) return } switch block.Err { // Success case ErrNoError: if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() { for _, msg := range msgs { msg.Timestamp = block.Timestamp } } for i, msg := range msgs { msg.Offset = block.Offset + int64(i) } bp.parent.returnSuccesses(msgs) // Retriable errors case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", bp.broker.ID(), topic, partition, block.Err) bp.currentRetries[topic][partition] = block.Err bp.parent.retryMessages(msgs, block.Err) bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) // Other non-retriable errors default: bp.parent.returnErrors(msgs, block.Err) } }) } func (bp *brokerProducer) handleError(sent *produceSet, err error) { switch err.(type) { case PacketEncodingError: sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.returnErrors(msgs, err) }) default: Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.retryMessages(msgs, err) }) bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.retryMessages(msgs, err) }) bp.rollOver() } } // singleton // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { var msg *ProducerMessage buf := queue.New() for { if buf.Length() == 0 { msg = <-p.retries } else { select { case msg = <-p.retries: case p.input <- buf.Peek().(*ProducerMessage): buf.Remove() continue } } if msg == nil { return } buf.Add(msg) } } // utility functions func (p *asyncProducer) shutdown() { Logger.Println("Producer shutting down.") p.inFlight.Add(1) p.input <- &ProducerMessage{flags: shutdown} p.inFlight.Wait() if p.ownClient { err := p.client.Close() if err != nil { Logger.Println("producer/shutdown failed to close the embedded client:", err) } } close(p.input) close(p.retries) close(p.errors) close(p.successes) } func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { msg.clear() pErr := &ProducerError{Msg: msg, Err: err} if p.conf.Producer.Return.Errors { p.errors <- pErr } else { Logger.Println(pErr) } p.inFlight.Done() } func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { for _, msg := range batch { p.returnError(msg, err) } } func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { if p.conf.Producer.Return.Successes { msg.clear() p.successes <- msg } p.inFlight.Done() } } func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) { if msg.retries >= p.conf.Producer.Retry.Max { p.returnError(msg, err) } else { msg.retries++ p.retries <- msg } } func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { for _, msg := range batch { p.retryMessage(msg, err) } } func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage { p.brokerLock.Lock() defer p.brokerLock.Unlock() bp := p.brokers[broker] if bp == nil { bp = p.newBrokerProducer(broker) p.brokers[broker] = bp p.brokerRefs[bp] = 0 } p.brokerRefs[bp]++ return bp } func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) { p.brokerLock.Lock() defer p.brokerLock.Unlock() p.brokerRefs[bp]-- if p.brokerRefs[bp] == 0 { close(bp) delete(p.brokerRefs, bp) if p.brokers[broker] == bp { delete(p.brokers, broker) } } } func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { p.brokerLock.Lock() defer p.brokerLock.Unlock() delete(p.brokers, broker) }