Dead Leter Topic


In the previous section, we looked at how to configure our Kafka consumer to recover from deserialization errors. Let's quickly understand this once again before we start understanding the dead letter topic.

In the above diagram, On the left side I have products microservice that publishes product created event to Kafka topic. Product microservice acts as Kafka producer and it uses a Json serializer to serialize messages in Json format. On the right side I have email notification microservice that acts as Kafka consumer. It also expects messages in Json format, and it uses Json Deserializer to convert these messages from Json into Java object using product created event Java class.

Let's take we have another producer which is Admin microservice. This new microservice uses String serializer and it published a message that is not compatible with the format that email notification microservice is expecting. Consumer microservice unable to deserialize because it's a in compatible format (String). It will keep listening to the message from kafka topic and it fails and it throws exception, unless we handle this excpetion.

We learned in previous section to configure our Kafka consumer to recover from this situation so that it can skip this faulty message and continue processing next message from the topic. And although it worked for us, it is only part of the solution.

We ignored bad message and left it sitting there unprocessed. But ignoring messages that could not be deserialized is not the best solution. And this is when the dead letter topic comes to help us. In the following lessons, we will configure our Kafka consumer so that a message that fails to be deserialized it is sent to a dead letter topic. Like shown in above image.

Dead Letter Topic:

Dead letter topic is a place where we send messages that failed to be processed due to some error. So instead of silently forgetting about bad messages, we will send them to a dead letter topic so that we can look at these messages later and decide what we want to do with them later. You can inspect messages that are in the dead letter topic. You can identify the root cause of their problem, take corrective actions, and maybe even create a separate consumer microservice that will consume messages from dead letter topic and process them differently. Now, by default, the name of the dead letter topic is just the same as a name of the current topic, but it will have extension dot(.) DLT.

Handle errors: The DefaultErrorHandler and DeadLetterPublishingRecoverer classes

we will create an error handler that will help us send message to a deadlier topic, to configure this first we need to go to consumer email notification microservice, KafkaConsumerConfiguration class, kafkaListenerContainerFactory method we need to create a new instance of default error handler class.
DefaultErrorHandler errorHandler = new DefaultErrorHandler();

So this default error handler class, it provides error handling capabilities for Kafka consumers. And it is used to handle exceptions that occur during message consumption by Kafka listener. Next we need to register the error handler with Kafka listener.
factory.setCommonErrorHandler(errorHandler);

If error occurs during message consumption by Kafka listener, the default error handler instance will be used to handle the error, and when exception takes place, the default error handler, it will rewind partitions after message failure so that it can be replayed to send message that could not be deserialized to a deadlock topic.

Next we need to create the new instance of dead letter publishing recovery class, and add as a parameter to to my error handler. The dead letter publishing recovery will need to use Kafka template. Which I can inject into this method as a method argument. It is a class in spring for Apache Kafka library that is used to send failed messages to a dead letter topic

Check below for the full code that use dead letter publishing recovery to publish failed messages to the letter topic when error occurs during message consumption by Kafka listener, and by default, the name of the dead letter topic will be just the same name as the name of the current topic from which this message was read, but it will have extension dot DLT.


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        ConsumerFactory<String, Object> consumerFactory,
        KafkaTemplate<String, Object> kafkaTemplate // DLT code
    ) {
        // DLT code
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate));

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setCommonErrorHandler(errorHandler); // DLT Code
        return factory;
    }
Create and configure Kafka template object

We will create and configure Kafka template object so that it can be used by the dead Letter Publishing recovery class to actually send failed message to a dead letter topic, and to make Kafka Template be available in the spring application context, so that it can be injected by spring framework into this Kafka listener container factory method. once we have Kafka template object in spring application context, we can inject it using spring dependency injection into other components and methods in our application.

Kafka template object will still need to be configured a little bit, In the method argument of Kafka template we need to add the ProducerFactory object which helps to initialize Kafka template which is used to create the Kafka template. we know that Kafka producer is responsible for sending messages to Kafka topic. So Kafka Template needs this producer factory so that it can use the created producer to send messages to a dead letter topic.

Now for Spring Framework to be able to inject Producer Factory into this method as method argument, we need to create it as a bean in a separate method. We can call it Producer Factory. This method will work as configuration and configuration key value pairs. Kafka producer will be created with this basic configuration. Kafka template will then use this Kafka producer to send bad messages to the dead latter topic.


    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }
Dead Letter Topic: Check how it works

To check the above implementation is working to publish the bad messages to Dead letter topic, we can follow below steps:

  1. 1. Start all the Kafka server
  2. 2. Start the Producer - Product microservice
  3. 3. Start the Consumer - Email Notification microservice
  4. 4. Run the Kafka producer using Kafka-CLI to send the Bad messages to product-created-events topic, run below script
    ./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic product-created-events-topic --property parse.key=true --property key.separator=:
  5. 5. First send the valid message to product-created-event-topic and the email notification microservice should able to consume the message.
  6. 6. Next send the bad message from Kafka-CLI which has been started using the script which has been mentioned in point no: 4
  7. 7. Run the Kafka consumer using Kafka-CLI to display all the bad messages from the beninning, run below script
    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic product-created-events-topic.DLT --from-beginning --property print.key=true --property print.value=true
  8. 8. Message will display in encoded format, if you decode the message using any online tool then you will be able to see the actual message.

Updated full source code is available in GitHub repository Kafka Consumer: Email Notification Microservice