In this lesson, will learn how to publish an events to kafka broker using Springboot microservice, not using the termianl windown, actually will create a new SpringBoot microservice which act as a kafka producer. Kafka producer is a spring boot microservice that publishes events to Kafka Broker. In the following lessons, you will learn how to create this Spring Boot microservice and how to configure it to act as Kafka producer.
So the primary role of Kafka Producer is to publish messages or events to one or more Kafka topics. In a Spring boot application, this is typically achieved using spring for Apache Kafka library, which simplifies the process of integrating Kafka functionality into your spring application.
Before sending a message, Kafka producer serializes it into a byte array. So another responsibility of Kafka Producer is to serialize messages to binary format that can be transmitted over the network to Kafka brokers.
Another responsibility of Kafka producer is to specify the name of Kafka topic, where these messages should be sent. In this particular use case, I call Kafka topic, the product created event topic, and it will store product created events.
When sending a new message to Kafka topic, we will also specify the partition where we want to store it, although this parameter is optional and if we do not specify topic partition, then Kafka Producer will make this decision for us. And if neither partition nor key is provided, then Kafka producer will distribute messages across partitions in a round robin fashion, ensuring a balanced load.
If a partition is not specified but a message key is provided, then Kafka will hash the key and use the result to determine the partition, and this ensures that all messages with the same key go to the same partition. So another responsibility of Kafka producer is to choose which partition to write event data to.
There are a couple of different ways that Kafka producer can send messages to Kafka Broker. It can be synchronous communication style, or asynchronous communication style.
Synchronous communication style:
Asynchronous communication style
To create a new SpringBoot microservice project first you need to navigate to the start.spring.io. Next select project a as Maven, language as Java and then next select the SpringBoot version (I am selecting 3.4.1). Next we need to fill the Project Metadata then next packing as Jar and Java version as 17.
Next add the following dependencies and click on Generate button.
Once project has been downloaded import the same project into your IntelliJ IDEA. And it looks like below in IDE
To create a kafka topic first we need to create a kafka configuration class under the root package of the maven package. Next add the below code snippet to create a kafka topic
package com.navabitsolutions.product.ms;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Bean
NewTopic createTopic() {
return TopicBuilder.name("product-created-event-topic")
.partitions(3)
.replicas(1)
.build();
}
}
Follow the below steps next.
Under the com.navabitsolutions.product.ms.request package create a ProductRequest class.
package com.navabitsolutions.product.ms.request;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.NoArgsConstructor;
import lombok.Builder;
import java.math.BigDecimal;
@Getter
@ToString
@NoArgsConstructor
@EqualsAndHashCode
@AllArgsConstructor
@Builder(builderClassName = "Builder", setterPrefix = "with")
public class ProductRequest {
private String title;
private BigDecimal price;
private Integer quality;
}
Under the com.navabitsolutions.product.ms.controller package create a ProductController class.
package com.navabitsolutions.product.ms.controller;
import com.navabitsolutions.product.ms.request.ProductRequest;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1/product")
public class ProductController {
public ResponseEntity<String> createProduct(@RequestBody ProductRequest productRequest) {
return ResponseEntity.status(HttpStatus.CREATED).body("");
}
}
Under the com.navabitsolutions.product.ms.controller package create a ProductController class.
package com.navabitsolutions.product.ms.controller;
import com.navabitsolutions.product.ms.request.ProductRequest;
import com.navabitsolutions.product.ms.service.ProductService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/v1/product")
@RequiredArgsConstructor
public class ProductController {
private final ProductService productService;
public ResponseEntity<String> createProduct(@RequestBody ProductRequest productRequest) {
String productId = productService.createProduct(productRequest);
return ResponseEntity.status(HttpStatus.CREATED).body(productId);
}
}
In service layer first we need to create an interface i.e ProductService.java with once abstract method called createProduct. Once it has been created then next we need to create a service implementation class to write the business logic.
package com.navabitsolutions.product.ms.service;
import com.navabitsolutions.product.ms.request.ProductRequest;
public interface ProductService {
String createProduct(ProductRequest productRequest);
}
package com.navabitsolutions.product.ms.service;
import com.navabitsolutions.product.ms.events.ProductCreatedEvent;
import com.navabitsolutions.product.ms.request.ProductRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
public class ProductServiceImpl implements ProductService {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
public ProductServiceImpl(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String createProduct(ProductRequest productRequest) {
String productId = UUID.randomUUID().toString();
ProductCreatedEvent productCreatedEvent = ProductCreatedEvent.builder()
.withProductId(productId)
.withTitle(productRequest.getTitle())
.withPrice(productRequest.getPrice())
.withQuality(productRequest.getQuality())
.build();
CompletableFuture<SendResult<String, ProductCreatedEvent>> future =
kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);
future.whenComplete((result, exception) -> {
if (exception != null) {
LOGGER.error("Failed to send messages {}", exception.getMessage());
} else {
LOGGER.info("Message sent successfully {}", result.getRecordMetadata());
}
});
return productId;
}
}
In above class we added the @Service annotation, it will mark this class as a special component and it indicate that this class contains a business logic, and also at the time of application startup it will create an instance of this class and added to the SpringApplicationContext. Once the object is available in SpringApplicationContext we can use spring dependency inject to inject this class into other classes.
Create a ProductCreatedEvent object under the package called package com.navabitsolutions.product.ms.events
package com.navabitsolutions.product.ms.events;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.NoArgsConstructor;
import lombok.Builder;
import java.math.BigDecimal;
@Getter
@ToString
@NoArgsConstructor
@EqualsAndHashCode
@AllArgsConstructor
@Builder(builderClassName = "Builder", setterPrefix = "with")
public class ProductCreatedEvent {
private String productId;
private String title;
private BigDecimal price;
private Integer quality;
}
Post implementation of spring boot product microservice, next we need to run the application to validate is producer microservice is producing the messages and publishing to kafka topic or not. To validate this first we need to bring up the kafka server or broker. Next using kafka CLI command we need to check is consumer is able to consume the message or not.
Run following command to bring up the kafka server: ./bin/kafka-server-start.sh config/kraft/server.properties
Run following command to read the kafka messages using Kafka-CLI consumer ./bin/kafka-console-consumer.sh --topic product-created-events-topic --bootstrap-server localhost:9092 --property print.key=true
Once the product microservice is up and running, we need to use the postman to fire the REST-API call, using below endpoint and request payload.
endpoint: POST: http://localhost:52713/api/v1/product/create
{
"title": "iPhone14",
"price": 140000,
"quality": 10
}
Once we receive the successful response, we can check the Kafka-CLI command line consumer to check is the message has been consumed from the topic or not. For that open the terminal which we already ran before we fire the request.
There are two diffierent way to send the message synchronously to kafka topic.