I'm always excited to take on new projects and collaborate with innovative minds.
contact@niteshsynergy.com
https://www.niteshsynergy.com/
Apache Kafka and messaging queues are critical components in building scalable, real-time systems. Below is a breakdown of concepts, use cases, approaches, and examples using Apache Kafka with Spring Boot REST APIs, including integration techniques.
What is Apache Kafka?
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission critical applications.
Apache Kafka Core Concepts We will discuss following Apache Kafka core concepts:
1. Kafka Cluster
2. Kafka Broker
3. Kafka Producer
4. Kafka Consumer
5. Kafka Topic
6. Kafka Partitions
7. Kafka Offsets
8. Kafka Consumer Group Let's begin with Kafka cluster.
1. Kafka Cluster Since Kafka is a distributed system, it acts as a cluster. A Kafka cluster consists of a set of brokers. A cluster has a minimum of 3 brokers.
The following diagram shows Kafka cluster with three Kafka brockers:
2. Kafka Broker
The broker is the Kafka server. It's just a meaningful name given to the Kafka server. And this name makes sense as well because all that Kafka does is act as a message broker between producer and consumer. The producer and consumer don't interact directly. They use the Kafka server as an agent or a broker to exchange messages.
The following diagram shows a Kafka broker, it acts as an agent or broker to exchange messages between Producer and Consumer:
3. Kafka Producer
Producer is an application that sends messages. It does not send messages directly to the recipient. It sends messages only to the Kafka server.
The following diagram shows Producer sends messages directly to Kafka broker:
4. Kafka Consumer
Consumer is an application that reads messages from the Kafka server.
If producers are sending data, they must be sending it to someone, right?
The consumers are the recipients. But remember that the producers don't send data to a recipient address.
They just send it to the Kafka server. Anyone who is interested in that data can come forward and take it from the Kafka server.
So, any application that requests data from a Kafka server is a consumer, and they can ask for data sent by any producer provided they have permission to read it.
The following diagram shows Producer sends messages directly to the Kafka broker and the Consumer consumes or reads messages from the Kafka broker:
5. Kafka Topic
We learned that producer sends data to the Kafka broker. Then a consumer can ask for data from the Kafka broker.
But the question is, Which data? We need to have some identification mechanism to request data from a broker.
There comes the Kafka topic.
• Topic is like a table in a database or folder in a file system.
• Topic is identified by a name.
• You can have any number of topics.
The following diagram shows two Topics are created in a Kafka broker:
6. Kafka Partitions
Kafka topics are divided into a number of partitions, which contain records in an unchangeable sequence.
Kafka Brokers will store messages for a topic. But the capacity of data can be enormous and it may not be possible to store in a single computer.
Therefore it will be partitioned into multiple parts and distributed among multiple computers since Kafka is a distributed system.
The following diagram shows Kafka's topic is further divided into a number of partitions
7. Kafka Offsets
Offset is a sequence of ids given to messages as they arrive at a partition. Once the offset is assigned it will never be changed. The first message gets an offset zero.
The next message receives an offset one and so on.
8. Kafka Consumer Group
A consumer group contains one or more consumers working together to process the messages.
game-events
Goal scored
by Player X
).transaction-events
The Spring Team provides Spring for Apache Kafka dependency to work with the development of Kafka-based messaging solutions.
Add dependencies:
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
Let's understand the above spring boot provided Kafka properties:
spring.kafka.consumer.group-id - specifies a unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.auto-offset-reset property - specifies what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (e.g. because that data has been deleted):
spring.kafka.consumer.key-deserializer - specifies the deserializer class for keys.
spring.kafka.consumer.value-deserializer - specifies the deserializer class for values.
spring.kafka.producer.key-deserializer - specifies the serializer class for keys.
spring.kafka.producer.value-deserializer - specifies the serializer class for values.
4. Create Kafka Topic To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored.
package net.niteshsynergy.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic niteshSynergyTopic() {
return TopicBuilder.name("niteshsynergy")
.build();
}
}
5. Create Kafka Producer Creating a producer will write our messages on the topic.
Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
LOGGER.info(String.format("Message sent -> %s", message));
kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
}
}
Create a utils package and within this package create AppConstants with the following content:
package net.niteshsynergy.springbootkafka.utils;
public class AppConstants {
public static final String TOPIC_NAME = "niteshsynergy";
public static final String GROUP_ID = "group_id";
}
KafKaProducer class uses KafkaTemplate to send messages to the configured topic name.
6. Create REST API to Send Message Create controller package, within controller package create KafkaProducerController with the following content to it:
package net.niteshsynergy.springbootkafka;
import net.niteshsynergy.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message) {
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to Kafka topic");
}
}
See Topic Messages via Command Line: bin/kafka-console-consumer.sh --topic niteshsynergy--from-beginning --bootstrapserver localhost:9092
Make sure to change the topic name. In our case "niteshsynergy" is the topic name.
7. Create Kafka Consumer Kafka Consumer is the service that will be responsible for reading messages and processing them according to the needs of your own business logic. To set it up, enter the following:
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(String message) {
LOGGER.info(String.format("Message received -> %s", message));
}
}
Here, we told our method void to consume (String message) to subscribe to the user’s topic and just emit every message to the application log. In your real application, you can handle messages the way your business requires you to. KafkaListener endpoint:
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(String message) {
LOGGER.info(String.format("Message received -> %s", message));
}
Let's run the Spring boot application and have the demo. Make sure that Zookeeper and Kafka services should be up and running.
Open a browser and hit the below link to call a REST API: http://localhost:8080/api/v1/kafka/publish?message=hello%20world
_______________________________________________________________________________________________________________________________
Spring Boot Kafka JsonSerializer and JsonDeserializer Example this tutorial, we will learn how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.
If you are new to Apache Kafka then you should check out my article - Apache Kafka Core Concepts. Well, basically you will learn How to send and receive a Java Object as a JSON byte[] to and from Apache Kafka.
Apache Kafka stores and transports byte[]. There are a number of built-in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.
We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterward, we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.
Add dependencies:
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
We are using the following Consumer property to convert JSON into Java object:
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
We are using the following Producer property to convert Java object into JSON:
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Let's understand the meaning of the above properties.
spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.group-id - A unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.key-deserializer - Deserializer class for keys.
spring.kafka.consumer.value-deserializer - Deserializer class for values.
spring.kafka.producer.key-serializer - Serializer class for keys.
spring.kafka.producer.value-serializer - Serializer class for values.
4. Create Kafka Topic To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "niteshsynergy" in this example. Let's create a KafkaTopicConfig file and add the following content:
package net.niteshsynergy.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic niteshsynergyTopic() {
return TopicBuilder.name("niteshsynergy")
.build();
}
}
5. Create Simple POJO to Serialize / Deserialize Let's create a User class to send and receive a User object to and from a Kafka topic. Well, the User instance will be serialized by JsonSerializer to a byte array.
Kafka finally stores this byte array into the given partition of the particular topic. During deserialization, JsonDeserializer is used to receive JSON from Kafka as a byte array, convert it JSON byte array to the User object, and return it to the application.
package net.niteshsynergy.springbootkafka.payload;
public class User {
private int id;
private String firstName;
private String lastName;
// Getters and Setters
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
// toString Method for easy representation
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
6. Create Kafka Producer to Produce JSON Message Let's create Kafka Producer to Produce JSON Messages using Spring Kafka. KafkaTemplate Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.payload.User;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendMessage(User data) {
LOGGER.info(String.format("Message sent -> %s", data.toString()));
// Create message with payload and topic header
Message<User> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
.build();
// Send the message to Kafka topic
kafkaTemplate.send(message);
}
}
Let’s start by sending a User object to a Kafka Topic.
Notice: we created a KafkaTemplate since we are sending Java Objects to the Kafka topic that’ll automatically be transformed into a JSON byte[]. In this example, we created a Message using the MessageBuilder.
It’s important to add the topic to which we are going to send the message too.
7. Create REST API to Send JSON Object Let's create a simple POST REST API to send User information as a JSON object.
Instead of sending a message string, we will create a POST REST API to post a complete User object as a JSON so that the Kafka producer can able to write the User object to the Kafka topic.
package net.niteshsynergy.springbootkafka.controller;
import net.niteshsynergy.springbootkafka.kafka.KafkaProducer;
import net.niteshsynergy.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private final KafkaProducer kafkaProducer;
// Constructor injection of KafkaProducer
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
// Endpoint to publish a User message to Kafka
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user) {
// Send the User object to Kafka
kafkaProducer.sendMessage(user);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
8. Create Kafka Consumer to Consume JSON Message Let's create a Kafka Consumer to receive JSON messages from the topic. In KafkaConsumer we simply need to add the User Java Object as a parameter in our method.
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.payload.User;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
// Kafka listener to consume messages from the topic
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(User data) {
LOGGER.info(String.format("Message received -> %s", data.toString()));
}
}
9. Demo Let's run the Spring boot application and have the demo.
Make sure that Zookeeper and Kafka services should be up and running.
Let's use the Postman client to make a POST REST API call: