Apache Kafka Producer: Acknowledgement and Retries

Acknowledgement Introduction

As shown in above image, When kafka producer sends a message, this message is sent to kafka broker and this will be stored in kafka topic in one of the partitions. Now, if your kafka cluster has more than one broker and proper replication factor configured, then topic partitions will be copied over to other brokers as well. In this case, one broker will act as a leader and it is the first one to receive Kafka message. Once it receives the message the same message will be copies to other brokers that act as followers.

Once leader broker stores message successfully, it sends acknowledgement to Kafka Producer which is by default. In default case the acknowledgement will be recived from one broker only from leader. It is very useful configuration because it confirms that the received message is stored.

In case before returning the acknowledgement if leader broker goes down then the message will be lost. To make our system more reliable, we can configure Kafka Producer to wait for acknowledgment from other brokers as well. In follwing lessons will learn how to receive acknowledgement from multiple kafka brokers leaders and followers. But the draw back of this one is our system work just a little bit slower.

In real-time cases there may be situations where we need to ensure that the message does not get lost in those cases you need to configure your Kafka producer to wait for acknowledgment from all in-sync replicas.

In case if the message is not very critical and it is okay if you lose some messages, then you can configure your Kafka producer not to wait for any acknowledgment at all.

To configure your Kafka producer to wait for acknowledgement from all brokers, you will use the following configuration property:

  1. spring.kafka.producer.acks=all: In this case, Kafka producer will wait for a full set of in-sync replicas to acknowledge the record. This option provides the strongest durability guarantees because no data will be lost as long as at least one in-sync replica is alive.
  2. spring.kafka.producer.acks=1: This option provides a trade off between latency and durability. It ensures that Kafka message has been received and stored by the leader broker, but it does not guarantee that Kafka message has been replicated to follower brokers. Some data may be lost if leader broker fails before followers catch up, so this option is faster than waiting for all brokers to acknowledge, but it is less reliable.
  3. spring.kafka.producer.acks=0: This option helpful in those cases when your Kafka producer sends a lot of messages and none of those messages are very critical. And because Kafka Producer does not wait, your system works faster, but there is no guarantee that all messages will be stored.

Now, let's assume that we decided to configure our Kafka producer to wait for acknowledgments from all followers this will slowdown the system. Let's assume we started even more brokers, it mean that the more brokers we start, the slower our Kafka producer will be, here we can use some of the configurations while creation of a topic like how many replicas your topic partitions will have, depend on the replication factor configured. Also we can configure the minimum number of replicas that you want to receive an acknowledgement from. Using below set of configuration we can achive this behavious

  1. spring.kafka.producer.acks=all
  2. --replication-factor=5
  3. --config min.insync.replicas=2

Retries Introduction

Lets take we have a case where kafka producer must receive an acknowledgement from leader and followers, if one of the follower goes down the default behavior of Kafka producer in this case is to retry the send operation for a very large number of times, or until the delivery timeout is reached, and the default timeout value is two minutes.


When Kafka producer sends a message to a broker, there can be no response this means that producer does not want to receive any acknowledgement from the broker, or it can receive a response, and in response it can receive acknowledgement of successful storage. But if the error takes place, then it can receive either non-writable error or retrievable error. If it receives a non-retractable error, then it is a type of error that is permanent and it is unlikely to be resolved by retries.

Retry configurations

  1. spring.kafka.producer.retries=10: How many time kafka producer will try to send a message before making it as failed. Default value is 2147483647
  2. spring.kafka.producer.properties.retry.backoff.ms=100: How long the producer will wait before attempting to retry a failed request. Default value is 100 ms
  3. spring.kafka.producer.properties.delivery.timeout.ms=120000: The maximum the producer can spend trying to deliver the message. Defautl value is 120000 (2 minutes)
  4. spring.kafka.producer.properties.linger.ms=0: The maximum time in milliseconds that the producer will wait and buffer data before sending a batch of messages. The default value is 0.
  5. spring.kafka.producer.properties.request.timeout.ms=30000: The maximum time to wait for a response from the broker after sending a request. Default value is 30000 ms

Acknowledgement and min.insync.replicas configuration in Producer

First to configure the acknowledgement in kafka producer, open the application.properties and add the following configuration property spring.kafka.producer.acks=all

Next we need to add the min.insync.replicas in topic. This can be added for existing or new topics.

  1. 1. To add the min.insync.replicas on existing topic first we need to check the available topics in kafka broker, for that run follwoing command in kafkaCLI: ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 which display the available topics. Choose the one which you want to add the min.insync.replicas configuration. To update the existing topics run following command in KafkaCLI: ./bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name product-created-events-topic --add-config min.insync.replicas=2 Post execution of above command the topic has been successfully altered. Check below screenshot.

  2. 2. To add the min.insync.replicas while creation of topic then we need to write the script like following ./bin/kafka-topics.sh --create --topic insync-topic-example --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092 --config min.insync.replicas=2

To test the above changes, first we need to start the start 3 kafka brokers. Post that using kafka CLI you need to start the consumer, also change the topic name in spring boot microservice. Once we make all the changes, using postman hit the request and check is consumer received the message or not. Next stop kafka broker 2 and 3 and then hit the request. You will see the error after 2 minutes. As we already learned in our previous lessons, if we do not configure the retry option the producer will continuously send a message for 2 munutes. Post that it will throw the error.

Producer Retries

Producer retry, property is used to specify the number of times Kafka producer will try sending a message to Kafka Broker if it fails to send it the first time. If you do not set this property, then the default value is very large. Retry attempts are made at regular intervals which are controlled by the retry backoff property. So this configuration property retry backoff milliseconds. It is used to specify the amount of time producer waits before attempting to retry a failed request.

  1. 1. spring.kafka.producer.retries=10 This property is help to configure the retry to 10 times.
  2. 2. spring.kafka.producer.properties.retry.backoff.ms=1000 This property is help to configure the retry interval to 1 sec
Instead of above two, kafka documentation will recommand to use following properties to be configured.
  1. 1. spring.kafka.producer.properties.delivery.timeout.ms=120000
  2. 2. spring.kafka.producer.properties.linger.ms=0
  3. 3. spring.kafka.producer.properties.request.timeout.ms=3000

Kafka Producer Spring Bean Configuration

Instead of completely depending on the application.properties configuration we can configure producer programmatically, using this we can write more complex business logic, apply conditions, integrate with other Java code, or fetch configuration values from a database if needed. To implement this add below changes in KafkaConfig.java.


  package com.navabitsolutions.product.ms;

  import org.apache.kafka.clients.admin.NewTopic;
  import org.apache.kafka.clients.producer.ProducerConfig;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
  import org.springframework.kafka.config.TopicBuilder;
  import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  import org.springframework.kafka.core.KafkaTemplate;
  import org.springframework.kafka.core.ProducerFactory;
  import com.navabitsolutions.product.ms.events.ProductCreatedEvent;
  
  import java.util.Map;
  import java.util.HashMap;
  
  @Configuration
  public class KafkaConfig {
  
      @Value("${spring.kafka.producer.bootstrap-servers}")
      private String bootstrapServers;
  
      @Value("${spring.kafka.producer.key-serializer}")
      private String keySerializer;
  
      @Value("${spring.kafka.producer.value-serializer}")
      private String valueSerializer;
  
      @Value("${spring.kafka.producer.acks}")
      private String acks;
  
      @Value("${spring.kafka.producer.properties.delivery.timeout.ms}")
      private String deliveryTimeout;
  
      @Value("${spring.kafka.producer.properties.linger.ms}")
      private String linger;
  
      @Value("${spring.kafka.producer.properties.request.timeout.ms}")
      private String requestTimeout;
  
      Map>String, Object< producerConfigs() {
          Map>String, Object< config = new HashMap><();
  
          config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
          config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
          config.put(ProducerConfig.ACKS_CONFIG, acks);
          config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeout);
          config.put(ProducerConfig.LINGER_MS_CONFIG, linger);
          config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
  
          return config;
      }
  
      @Bean
      ProducerFactory>String, ProductCreatedEvent< producerFactory() {
          return new DefaultKafkaProducerFactory><(producerConfigs());
      }
  
      @Bean
      KafkaTemplate>String, ProductCreatedEvent< kafkaTemplate() {
          return new KafkaTemplate>String, ProductCreatedEvent<(producerFactory());
      }
  
      @Bean
      NewTopic createTopic() {
          return TopicBuilder.name("product-created-events-topic")
                  .partitions(3)
                  .replicas(1)
                  .build();
      }
  }
  

Full code base is available in GitHub repository Kafka Producer: Product Microservice