Handle Deserializer Errors

In the following lessons, you will learn how to configure your Kafka consumer application so that it can recover from deserialization errors.

Error Handling in Kafka Consumer

If you see above diagram, on the left side, I have products microservice that acts as a producer. When a new product is created, products microservice publishes a new product created event. Before this product created event is stored in Kafka topic, it is serialized using Json serializer.

On the right side in above my diagram, I have email notification microservice that acts as a consumer. It listens for product created events and when it reads new message from Kafka topic, it uses Json deserializer to deserialize this message or convert it from Json into Java object.

Let's assume we have another new microservice called Admin Microservice. This microservice uses a string serializer and it published a message of completely different format to the same product created event topic. When email notification microservice pulls a message from Kafka topic, it will get error because Json serializer will not be able to convert this message into a product created event Java object. The message is not considered as successfully consumed.

Next time our email notification microservice consumes message from Kafka topic, it will again get this faulty message and it will again throw exception. So our email notification microservice will get stuck in endless loop, consuming the same faulty message that cannot be deserialized again and again, and it will keep on throwing exceptions.

Causing the Deserializer problem

To replicate the deserialization issue, we can create the admin microservice and hit the request to publish the string to the product created event topic or else we can also try sending the kafka message using kafka-cli.

First bring up the consumer microservice which is EmailNotificationMicroservice, post that open the command prompt and navigate to the kafka folder and run the kafka producer cli command
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic product-created-events-topic --property "parse.key=true" --property "key.separator=:"


As shown in above image if we run the above command, in EmailNotificationMicroservice will be able to see lot's of errors in loop, to handle This we need to use the ErrorHandlingDeserializer.

Error Handling Deserializer

To handle the deserializer error which executes in loop, we need to consider using the ErrorHandlingDeserializer class. In EmailNotificationMicroservice in KafkaConsumerConfiguration class instead of using this config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); we need to replace it with below one, check below class to understand more. Which helps to handle the error which struck in the loop. The Error message will print only once.


    package com.navabitsolutions.email.notification.ms.config;

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.env.Environment;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.support.serializer.JsonDeserializer;

    import java.util.HashMap;
    import java.util.Map;

    @Configuration
    public class KafkaConsumerConfiguration {

        @Autowired
        private Environment environment;

        @Bean
        public ConsumerFactory<String, Object> consumerFactory() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            //config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            
            // Below two lines of code to handle the deserializer error
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
            config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
            
            config.put(JsonDeserializer.TRUSTED_PACKAGES, environment.getProperty("spring.kafka.consumer.properties.spring.json.trusted.packages"));
            config.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("spring.kafka.consumer.group-id"));
            return new DefaultKafkaConsumerFactory<>(config);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
                ConsumerFactory<String, Object> consumerFactory) {
            ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory);
            return factory;
        }
    }

Updated full source code is available in GitHub repository Kafka Consumer: Email Notification Microservice