As shown in above image, When kafka producer sends a message, this message is sent to kafka broker and this will be stored in kafka topic in one of the partitions. Now, if your kafka cluster has more than one broker and proper replication factor configured, then topic partitions will be copied over to other brokers as well. In this case, one broker will act as a leader and it is the first one to receive Kafka message. Once it receives the message the same message will be copies to other brokers that act as followers.
Once leader broker stores message successfully, it sends acknowledgement to Kafka Producer which is by default. In default case the acknowledgement will be recived from one broker only from leader. It is very useful configuration because it confirms that the received message is stored.
In case before returning the acknowledgement if leader broker goes down then the message will be lost. To make our system more reliable, we can configure Kafka Producer to wait for acknowledgment from other brokers as well. In follwing lessons will learn how to receive acknowledgement from multiple kafka brokers leaders and followers. But the draw back of this one is our system work just a little bit slower.
In real-time cases there may be situations where we need to ensure that the message does not get lost in those cases you need to configure your Kafka producer to wait for acknowledgment from all in-sync replicas.
In case if the message is not very critical and it is okay if you lose some messages, then you can configure your Kafka producer not to wait for any acknowledgment at all.
To configure your Kafka producer to wait for acknowledgement from all brokers, you will use the following configuration property:
Now, let's assume that we decided to configure our Kafka producer to wait for acknowledgments from all followers this will slowdown the system. Let's assume we started even more brokers, it mean that the more brokers we start, the slower our Kafka producer will be, here we can use some of the configurations while creation of a topic like how many replicas your topic partitions will have, depend on the replication factor configured. Also we can configure the minimum number of replicas that you want to receive an acknowledgement from. Using below set of configuration we can achive this behavious
Lets take we have a case where kafka producer must receive an acknowledgement from leader and followers, if one of the follower goes down the default behavior of Kafka producer in this case is to retry the send operation for a very large number of times, or until the delivery timeout is reached, and the default timeout value is two minutes.
When Kafka producer sends a message to a broker, there can be no response this means that producer does not want to receive any acknowledgement from the broker, or it can receive a response, and in response it can receive acknowledgement of successful storage. But if the error takes place, then it can receive either non-writable error or retrievable error. If it receives a non-retractable error, then it is a type of error that is permanent and it is unlikely to be resolved by retries.
Retry configurations
First to configure the acknowledgement in kafka producer, open the application.properties and add the following configuration property spring.kafka.producer.acks=all
Next we need to add the min.insync.replicas in topic. This can be added for existing or new topics.
To test the above changes, first we need to start the start 3 kafka brokers. Post that using kafka CLI you need to start the consumer, also change the topic name in spring boot microservice. Once we make all the changes, using postman hit the request and check is consumer received the message or not. Next stop kafka broker 2 and 3 and then hit the request. You will see the error after 2 minutes. As we already learned in our previous lessons, if we do not configure the retry option the producer will continuously send a message for 2 munutes. Post that it will throw the error.
Producer retry, property is used to specify the number of times Kafka producer will try sending a message to Kafka Broker if it fails to send it the first time. If you do not set this property, then the default value is very large. Retry attempts are made at regular intervals which are controlled by the retry backoff property. So this configuration property retry backoff milliseconds. It is used to specify the amount of time producer waits before attempting to retry a failed request.
Instead of completely depending on the application.properties configuration we can configure producer programmatically, using this we can write more complex business logic, apply conditions, integrate with other Java code, or fetch configuration values from a database if needed. To implement this add below changes in KafkaConfig.java.
package com.navabitsolutions.product.ms;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import com.navabitsolutions.product.ms.events.ProductCreatedEvent;
import java.util.Map;
import java.util.HashMap;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.properties.delivery.timeout.ms}")
private String deliveryTimeout;
@Value("${spring.kafka.producer.properties.linger.ms}")
private String linger;
@Value("${spring.kafka.producer.properties.request.timeout.ms}")
private String requestTimeout;
Map>String, Object< producerConfigs() {
Map>String, Object< config = new HashMap><();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
config.put(ProducerConfig.ACKS_CONFIG, acks);
config.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeout);
config.put(ProducerConfig.LINGER_MS_CONFIG, linger);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
return config;
}
@Bean
ProducerFactory>String, ProductCreatedEvent< producerFactory() {
return new DefaultKafkaProducerFactory><(producerConfigs());
}
@Bean
KafkaTemplate>String, ProductCreatedEvent< kafkaTemplate() {
return new KafkaTemplate>String, ProductCreatedEvent<(producerFactory());
}
@Bean
NewTopic createTopic() {
return TopicBuilder.name("product-created-events-topic")
.partitions(3)
.replicas(1)
.build();
}
}
Full code base is available in GitHub repository Kafka Producer: Product Microservice