You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
904 lines
25 KiB
904 lines
25 KiB
8 years ago
package sarama
import (
// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
// leaks: it will not be garbage-collected automatically when it passes out of
// scope.
type AsyncProducer interface {
// AsyncClose triggers a shutdown of the producer, flushing any messages it may
// have buffered. The shutdown has completed when both the Errors and Successes
// channels have been closed. When calling AsyncClose, you *must* continue to
// read from those channels in order to drain the results of any messages in
// flight.
// Close shuts down the producer and flushes any messages it may have buffered.
// You must call this function before a producer object passes out of scope, as
// it may otherwise leak memory. You must call this before calling Close on the
// underlying client.
Close() error
// Input is the input channel for the user to write messages to that they
// wish to send.
Input() chan<- *ProducerMessage
// Successes is the success output channel back to the user when AckSuccesses is
// enabled. If Return.Successes is true, you MUST read from this channel or the
// Producer will deadlock. It is suggested that you send and read messages
// together in a single select statement.
Successes() <-chan *ProducerMessage
// Errors is the error output channel back to the user. You MUST read from this
// channel or the Producer will deadlock when the channel is full. Alternatively,
// you can set Producer.Return.Errors in your config to false, which prevents
// errors to be returned.
Errors() <-chan *ProducerError
type asyncProducer struct {
client Client
conf *Config
ownClient bool
errors chan *ProducerError
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup
brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokerLock sync.Mutex
// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
p.(*asyncProducer).ownClient = true
return p, nil
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
// launch our singleton dispatchers
go withRecover(p.dispatcher)
go withRecover(p.retryHandler)
return p, nil
type flagSet int8
const (
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
fin // final message from partitionProducer to brokerProducer and back
shutdown // start the shutdown process
// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
type ProducerMessage struct {
Topic string // The Kafka topic for this message.
// The partitioning key for this message. Pre-existing Encoders include
// StringEncoder and ByteEncoder.
Key Encoder
// The actual message to store in Kafka. Pre-existing Encoders include
// StringEncoder and ByteEncoder.
Value Encoder
// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
// pass-through data.
Metadata interface{}
// Below this point are filled in by the producer as the message is processed
// Offset is the offset of the message stored on the broker. This is only
// guaranteed to be defined if the message was successfully delivered and
// RequiredAcks is not NoResponse.
Offset int64
// Partition is the partition that the message was sent to. This is only
// guaranteed to be defined if the message was successfully delivered.
Partition int32
// Timestamp is the timestamp assigned to the message by the broker. This
// is only guaranteed to be defined if the message was successfully
// delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
// least version 0.10.0.
Timestamp time.Time
retries int
flags flagSet
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
func (m *ProducerMessage) byteSize() int {
size := producerMessageOverhead
if m.Key != nil {
size += m.Key.Length()
if m.Value != nil {
size += m.Value.Length()
return size
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
// ProducerError is the type of error generated when the producer fails to deliver a message.
// It contains the original ProducerMessage as well as the actual error value.
type ProducerError struct {
Msg *ProducerMessage
Err error
func (pe ProducerError) Error() string {
return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
// when closing a producer.
type ProducerErrors []*ProducerError
func (pe ProducerErrors) Error() string {
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
func (p *asyncProducer) Errors() <-chan *ProducerError {
return p.errors
func (p *asyncProducer) Successes() <-chan *ProducerMessage {
return p.successes
func (p *asyncProducer) Input() chan<- *ProducerMessage {
return p.input
func (p *asyncProducer) Close() error {
if p.conf.Producer.Return.Successes {
go withRecover(func() {
for _ = range p.successes {
var errors ProducerErrors
if p.conf.Producer.Return.Errors {
for event := range p.errors {
errors = append(errors, event)
if len(errors) > 0 {
return errors
return nil
func (p *asyncProducer) AsyncClose() {
go withRecover(p.shutdown)
// singleton
// dispatches messages by topic
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan<- *ProducerMessage)
shuttingDown := false
for msg := range p.input {
if msg == nil {
Logger.Println("Something tried to send a nil message, it was ignored.")
if msg.flags&shutdown != 0 {
shuttingDown = true
} else if msg.retries == 0 {
if shuttingDown {
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
if p.conf.Producer.Return.Errors {
p.errors <- pErr
} else {
if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageSizeTooLarge)
handler := handlers[msg.Topic]
if handler == nil {
handler = p.newTopicProducer(msg.Topic)
handlers[msg.Topic] = handler
handler <- msg
for _, handler := range handlers {
// one per topic
// partitions messages, then dispatches them by partition
type topicProducer struct {
parent *asyncProducer
topic string
input <-chan *ProducerMessage
breaker *breaker.Breaker
handlers map[int32]chan<- *ProducerMessage
partitioner Partitioner
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
tp := &topicProducer{
parent: p,
topic: topic,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
handlers: make(map[int32]chan<- *ProducerMessage),
partitioner: p.conf.Producer.Partitioner(topic),
go withRecover(tp.dispatch)
return input
func (tp *topicProducer) dispatch() {
for msg := range tp.input {
if msg.retries == 0 {
if err := tp.partitionMessage(msg); err != nil {
tp.parent.returnError(msg, err)
handler := tp.handlers[msg.Partition]
if handler == nil {
handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
tp.handlers[msg.Partition] = handler
handler <- msg
for _, handler := range tp.handlers {
func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
var partitions []int32
err := tp.breaker.Run(func() (err error) {
if tp.partitioner.RequiresConsistency() {
partitions, err = tp.parent.client.Partitions(msg.Topic)
} else {
partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
if err != nil {
return err
numPartitions := int32(len(partitions))
if numPartitions == 0 {
return ErrLeaderNotAvailable
choice, err := tp.partitioner.Partition(msg, numPartitions)
if err != nil {
return err
} else if choice < 0 || choice >= numPartitions {
return ErrInvalidPartition
msg.Partition = partitions[choice]
return nil
// one per partition per topic
// dispatches messages to the appropriate broker
// also responsible for maintaining message order during retries
type partitionProducer struct {
parent *asyncProducer
topic string
partition int32
input <-chan *ProducerMessage
leader *Broker
breaker *breaker.Breaker
output chan<- *ProducerMessage
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
// therefore whether our buffer is complete and safe to flush)
highWatermark int
retryState []partitionRetryState
type partitionRetryState struct {
buf []*ProducerMessage
expectChaser bool
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
pp := &partitionProducer{
parent: p,
topic: topic,
partition: partition,
input: input,
breaker: breaker.New(3, 1, 10*time.Second),
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
go withRecover(pp.dispatch)
return input
func (pp *partitionProducer) dispatch() {
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
// on the first message
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
pp.output = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
for msg := range pp.input {
if msg.retries > pp.highWatermark {
// a new, higher, retry level; handle it and then back off
} else if pp.highWatermark > 0 {
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
if msg.retries < pp.highWatermark {
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
if msg.flags&fin == fin {
pp.retryState[msg.retries].expectChaser = false
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
} else {
pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
} else if msg.flags&fin == fin {
// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
// meaning this retry level is done and we can go down (at least) one level and flush that
pp.retryState[pp.highWatermark].expectChaser = false
pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
// without breaking any of our ordering guarantees
if pp.output == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.output <- msg
if pp.output != nil {
pp.parent.unrefBrokerProducer(pp.leader, pp.output)
func (pp *partitionProducer) newHighWatermark(hwm int) {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
pp.highWatermark = hwm
// send off a fin so that we know when everything "in between" has made it
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
pp.retryState[pp.highWatermark].expectChaser = true
pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
// a new HWM means that our current broker selection is out of date
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
pp.parent.unrefBrokerProducer(pp.leader, pp.output)
pp.output = nil
func (pp *partitionProducer) flushRetryBuffers() {
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
for {
if pp.output == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
goto flushDone
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
for _, msg := range pp.retryState[pp.highWatermark].buf {
pp.output <- msg
pp.retryState[pp.highWatermark].buf = nil
if pp.retryState[pp.highWatermark].expectChaser {
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
} else if pp.highWatermark == 0 {
Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
func (pp *partitionProducer) updateLeader() error {
return pp.breaker.Run(func() (err error) {
if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
return err
if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
return err
pp.output = pp.parent.getBrokerProducer(pp.leader)
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
return nil
// one per broker; also constructs an associated flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
bp := &brokerProducer{
parent: p,
broker: broker,
input: input,
output: bridge,
responses: responses,
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
go withRecover(
// minimal bridge to make the network response `select`able
go withRecover(func() {
for set := range bridge {
request := set.buildRequest()
response, err := broker.Produce(request)
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
return input
type brokerProducerResponse struct {
set *produceSet
err error
res *ProduceResponse
// groups messages together into appropriately-sized batches for sending to the broker
// handles state related to retries etc
type brokerProducer struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
buffer *produceSet
timer <-chan time.Time
timerFired bool
closing error
currentRetries map[string]map[int32]error
func (bp *brokerProducer) run() {
var output chan<- *produceSet
Logger.Printf("producer/broker/%d starting up\n",
for {
select {
case msg := <-bp.input:
if msg == nil {
if msg.flags&syn == syn {
Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
|, msg.Topic, msg.Partition)
if bp.currentRetries[msg.Topic] == nil {
bp.currentRetries[msg.Topic] = make(map[int32]error)
bp.currentRetries[msg.Topic][msg.Partition] = nil
if reason := bp.needsRetry(msg); reason != nil {
bp.parent.retryMessage(msg, reason)
if bp.closing == nil && msg.flags&fin == fin {
// we were retrying this partition but we can start processing again
delete(bp.currentRetries[msg.Topic], msg.Partition)
Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
|, msg.Topic, msg.Partition)
if bp.buffer.wouldOverflow(msg) {
if err := bp.waitForSpace(msg); err != nil {
bp.parent.retryMessage(msg, err)
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
case <-bp.timer:
bp.timerFired = true
case output <- bp.buffer:
case response := <-bp.responses:
if bp.timerFired || bp.buffer.readyToFlush() {
output = bp.output
} else {
output = nil
func (bp *brokerProducer) shutdown() {
for !bp.buffer.empty() {
select {
case response := <-bp.responses:
case bp.output <- bp.buffer:
for response := range bp.responses {
Logger.Printf("producer/broker/%d shut down\n",
func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
if bp.closing != nil {
return bp.closing
return bp.currentRetries[msg.Topic][msg.Partition]
func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n",
for {
select {
case response := <-bp.responses:
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
} else if !bp.buffer.wouldOverflow(msg) {
return nil
case bp.output <- bp.buffer:
return nil
func (bp *brokerProducer) rollOver() {
bp.timer = nil
bp.timerFired = false
bp.buffer = newProduceSet(bp.parent)
func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
if response.err != nil {
bp.handleError(response.set, response.err)
} else {
bp.handleSuccess(response.set, response.res)
if bp.buffer.empty() {
bp.rollOver() // this can happen if the response invalidated our buffer
func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
// we iterate through the blocks in the request set, not the response, so that we notice
// if the response is missing a block completely
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
block := response.GetBlock(topic, partition)
if block == nil {
bp.parent.returnErrors(msgs, ErrIncompleteResponse)
switch block.Err {
// Success
case ErrNoError:
if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
for _, msg := range msgs {
msg.Timestamp = block.Timestamp
for i, msg := range msgs {
msg.Offset = block.Offset + int64(i)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
|, topic, partition, block.Err)
bp.currentRetries[topic][partition] = block.Err
bp.parent.retryMessages(msgs, block.Err)
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
// Other non-retriable errors
bp.parent.returnErrors(msgs, block.Err)
func (bp *brokerProducer) handleError(sent *produceSet, err error) {
switch err.(type) {
case PacketEncodingError:
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.returnErrors(msgs, err)
Logger.Printf("producer/broker/%d state change to [closing] because %s\n",, err)
_ =
bp.closing = err
sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.retryMessages(msgs, err)
bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
bp.parent.retryMessages(msgs, err)
// singleton
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
// based on
func (p *asyncProducer) retryHandler() {
var msg *ProducerMessage
buf := queue.New()
for {
if buf.Length() == 0 {
msg = <-p.retries
} else {
select {
case msg = <-p.retries:
case p.input <- buf.Peek().(*ProducerMessage):
if msg == nil {
// utility functions
func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.input <- &ProducerMessage{flags: shutdown}
if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
p.errors <- pErr
} else {
func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
for _, msg := range batch {
p.returnError(msg, err)
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if p.conf.Producer.Return.Successes {
p.successes <- msg
func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, err)
} else {
p.retries <- msg
func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
for _, msg := range batch {
p.retryMessage(msg, err)
func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
defer p.brokerLock.Unlock()
bp := p.brokers[broker]
if bp == nil {
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
return bp
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
defer p.brokerLock.Unlock()
if p.brokerRefs[bp] == 0 {
delete(p.brokerRefs, bp)
if p.brokers[broker] == bp {
delete(p.brokers, broker)
func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
defer p.brokerLock.Unlock()
delete(p.brokers, broker)