I'm always excited to take on new projects and collaborate with innovative minds.
contact@niteshsynergy.com
https://www.niteshsynergy.com/
RabbitMQ, Kafka, and IBM MQ are three popular messaging systems, but they differ in terms of their design, use cases, and key features.
What is Apache Kafka?
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission critical applications.
Apache Kafka Core Concepts We will discuss following Apache Kafka core concepts:
1. Kafka Cluster
2. Kafka Broker
3. Kafka Producer
4. Kafka Consumer
5. Kafka Topic
6. Kafka Partitions
7. Kafka Offsets
8. Kafka Consumer Group Let's begin with Kafka cluster.
1. Kafka Cluster Since Kafka is a distributed system, it acts as a cluster. A Kafka cluster consists of a set of brokers. A cluster has a minimum of 3 brokers.
The following diagram shows Kafka cluster with three Kafka brockers:
2. Kafka Broker
The broker is the Kafka server. It's just a meaningful name given to the Kafka server. And this name makes sense as well because all that Kafka does is act as a message broker between producer and consumer. The producer and consumer don't interact directly. They use the Kafka server as an agent or a broker to exchange messages.
The following diagram shows a Kafka broker, it acts as an agent or broker to exchange messages between Producer and Consumer:
3. Kafka Producer
Producer is an application that sends messages. It does not send messages directly to the recipient. It sends messages only to the Kafka server.
The following diagram shows Producer sends messages directly to Kafka broker:
4. Kafka Consumer
Consumer is an application that reads messages from the Kafka server.
If producers are sending data, they must be sending it to someone, right?
The consumers are the recipients. But remember that the producers don't send data to a recipient address.
They just send it to the Kafka server. Anyone who is interested in that data can come forward and take it from the Kafka server.
So, any application that requests data from a Kafka server is a consumer, and they can ask for data sent by any producer provided they have permission to read it.
The following diagram shows Producer sends messages directly to the Kafka broker and the Consumer consumes or reads messages from the Kafka broker:
5. Kafka Topic
We learned that producer sends data to the Kafka broker. Then a consumer can ask for data from the Kafka broker.
But the question is, Which data? We need to have some identification mechanism to request data from a broker.
There comes the Kafka topic.
• Topic is like a table in a database or folder in a file system.
• Topic is identified by a name.
• You can have any number of topics.
The following diagram shows two Topics are created in a Kafka broker:
6. Kafka Partitions
Kafka topics are divided into a number of partitions, which contain records in an unchangeable sequence.
Kafka Brokers will store messages for a topic. But the capacity of data can be enormous and it may not be possible to store in a single computer.
Therefore it will be partitioned into multiple parts and distributed among multiple computers since Kafka is a distributed system.
The following diagram shows Kafka's topic is further divided into a number of partitions
7. Kafka Offsets
Offset is a sequence of ids given to messages as they arrive at a partition. Once the offset is assigned it will never be changed. The first message gets an offset zero.
The next message receives an offset one and so on.
8. Kafka Consumer Group
A consumer group contains one or more consumers working together to process the messages.
game-events
Goal scored
by Player X
).transaction-events
The Spring Team provides Spring for Apache Kafka dependency to work with the development of Kafka-based messaging solutions.
Add dependencies:
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
Let's understand the above spring boot provided Kafka properties:
spring.kafka.consumer.group-id - specifies a unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.auto-offset-reset property - specifies what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (e.g. because that data has been deleted):
spring.kafka.consumer.key-deserializer - specifies the deserializer class for keys.
spring.kafka.consumer.value-deserializer - specifies the deserializer class for values.
spring.kafka.producer.key-deserializer - specifies the serializer class for keys.
spring.kafka.producer.value-deserializer - specifies the serializer class for values.
4. Create Kafka Topic To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored.
package net.niteshsynergy.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic niteshSynergyTopic() {
return TopicBuilder.name("niteshsynergy")
.build();
}
}
5. Create Kafka Producer Creating a producer will write our messages on the topic.
Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
LOGGER.info(String.format("Message sent -> %s", message));
kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
}
}
Create a utils package and within this package create AppConstants with the following content:
package net.niteshsynergy.springbootkafka.utils;
public class AppConstants {
public static final String TOPIC_NAME = "niteshsynergy";
public static final String GROUP_ID = "group_id";
}
KafKaProducer class uses KafkaTemplate to send messages to the configured topic name.
6. Create REST API to Send Message Create controller package, within controller package create KafkaProducerController with the following content to it:
package net.niteshsynergy.springbootkafka;
import net.niteshsynergy.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message) {
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to Kafka topic");
}
}
See Topic Messages via Command Line: bin/kafka-console-consumer.sh --topic niteshsynergy--from-beginning --bootstrapserver localhost:9092
Make sure to change the topic name. In our case "niteshsynergy" is the topic name.
7. Create Kafka Consumer Kafka Consumer is the service that will be responsible for reading messages and processing them according to the needs of your own business logic. To set it up, enter the following:
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(String message) {
LOGGER.info(String.format("Message received -> %s", message));
}
}
Here, we told our method void to consume (String message) to subscribe to the user’s topic and just emit every message to the application log. In your real application, you can handle messages the way your business requires you to. KafkaListener endpoint:
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(String message) {
LOGGER.info(String.format("Message received -> %s", message));
}
Let's run the Spring boot application and have the demo. Make sure that Zookeeper and Kafka services should be up and running.
Open a browser and hit the below link to call a REST API: http://localhost:8080/api/v1/kafka/publish?message=hello%20world
_______________________________________________________________________________________________________________________________
Spring Boot Kafka JsonSerializer and JsonDeserializer Example this tutorial, we will learn how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.
If you are new to Apache Kafka then you should check out my article - Apache Kafka Core Concepts. Well, basically you will learn How to send and receive a Java Object as a JSON byte[] to and from Apache Kafka.
Apache Kafka stores and transports byte[]. There are a number of built-in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.
We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterward, we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.
Add dependencies:
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
We are using the following Consumer property to convert JSON into Java object:
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
We are using the following Producer property to convert Java object into JSON:
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Let's understand the meaning of the above properties.
spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.group-id - A unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.key-deserializer - Deserializer class for keys.
spring.kafka.consumer.value-deserializer - Deserializer class for values.
spring.kafka.producer.key-serializer - Serializer class for keys.
spring.kafka.producer.value-serializer - Serializer class for values.
4. Create Kafka Topic To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "niteshsynergy" in this example. Let's create a KafkaTopicConfig file and add the following content:
package net.niteshsynergy.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic niteshsynergyTopic() {
return TopicBuilder.name("niteshsynergy")
.build();
}
}
5. Create Simple POJO to Serialize / Deserialize Let's create a User class to send and receive a User object to and from a Kafka topic. Well, the User instance will be serialized by JsonSerializer to a byte array.
Kafka finally stores this byte array into the given partition of the particular topic. During deserialization, JsonDeserializer is used to receive JSON from Kafka as a byte array, convert it JSON byte array to the User object, and return it to the application.
package net.niteshsynergy.springbootkafka.payload;
public class User {
private int id;
private String firstName;
private String lastName;
// Getters and Setters
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
// toString Method for easy representation
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
6. Create Kafka Producer to Produce JSON Message Let's create Kafka Producer to Produce JSON Messages using Spring Kafka. KafkaTemplate Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.payload.User;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendMessage(User data) {
LOGGER.info(String.format("Message sent -> %s", data.toString()));
// Create message with payload and topic header
Message<User> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
.build();
// Send the message to Kafka topic
kafkaTemplate.send(message);
}
}
Let’s start by sending a User object to a Kafka Topic.
Notice: we created a KafkaTemplate since we are sending Java Objects to the Kafka topic that’ll automatically be transformed into a JSON byte[]. In this example, we created a Message using the MessageBuilder.
It’s important to add the topic to which we are going to send the message too.
7. Create REST API to Send JSON Object Let's create a simple POST REST API to send User information as a JSON object.
Instead of sending a message string, we will create a POST REST API to post a complete User object as a JSON so that the Kafka producer can able to write the User object to the Kafka topic.
package net.niteshsynergy.springbootkafka.controller;
import net.niteshsynergy.springbootkafka.kafka.KafkaProducer;
import net.niteshsynergy.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private final KafkaProducer kafkaProducer;
// Constructor injection of KafkaProducer
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
// Endpoint to publish a User message to Kafka
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user) {
// Send the User object to Kafka
kafkaProducer.sendMessage(user);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
8. Create Kafka Consumer to Consume JSON Message Let's create a Kafka Consumer to receive JSON messages from the topic. In KafkaConsumer we simply need to add the User Java Object as a parameter in our method.
package net.niteshsynergy.springbootkafka.kafka;
import net.niteshsynergy.springbootkafka.payload.User;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
// Kafka listener to consume messages from the topic
@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(User data) {
LOGGER.info(String.format("Message received -> %s", data.toString()));
}
}
9. Demo Let's run the Spring boot application and have the demo.
Make sure that Zookeeper and Kafka services should be up and running.
Let's use the Postman client to make a POST REST API call:
Kafka producers send messages to a Kafka topic.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return configs;
}
}
Producer Code:
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send("topic_name", message);
}
}
Kafka consumers receive messages from a Kafka topic.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
return new ConcurrentMessageListenerContainer<>(consumerFactory());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
}
Consumer Code:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "topic_name", groupId = "group_id")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
Use Case: A real-time event streaming system where events (e.g., user activity logs, sensor data, etc.) are produced and consumed by different services for processing and analysis.
Concepts:
Rules & Points:
Spring Boot Project (Kafka)
pom.xml
):<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. Configuration (KafkaConfig.java
):
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
}
}
3. Producer (REST Controller) (EventController.java
):
@RestController
@RequestMapping("/event")
public class EventController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/send")
public ResponseEntity<String> sendEvent(@RequestBody String eventMessage) {
kafkaTemplate.send("event_topic", eventMessage);
return ResponseEntity.ok("Event sent to Kafka");
}
}
4.Consumer (EventListener.java
):
@Component
public class EventListener {
@KafkaListener(topics = "event_topic", groupId = "event_group")
public void listenEvent(String eventMessage) {
System.out.println("Received Event: " + eventMessage);
// Process event message asynchronously
}
}
5. Event Model (Event.java
):
public class Event {
private String message;
private Date timestamp;
// Getters and Setters
}
application.properties
For Kafka, you'll need to specify the Kafka broker, topics, consumer group, and other configurations.
# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=event_group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Additional configurations
spring.kafka.listener.concurrency=3 # Number of consumer threads
spring.kafka.listener.ack-mode=manual # Manual acknowledgment for fine-grained control
spring.kafka.consumer.max-poll-records=10 # Max records pulled per poll
spring.kafka.bootstrap-servers
: Comma-separated list of Kafka brokers to connect to.spring.kafka.consumer.group-id
: Kafka consumer group ID.spring.kafka.consumer.auto-offset-reset
: How Kafka should handle the offset when consuming a topic. earliest
means starting from the earliest available message.spring.kafka.consumer.key-deserializer
/ value-deserializer
: Specifies the deserializers for the consumer to convert bytes into Java objects.spring.kafka.producer.key-serializer
/ value-serializer
: Specifies the serializers for the producer.spring.kafka.listener.concurrency
: Number of consumers that will handle partitions.spring.kafka.listener.ack-mode
: Acknowledgment mode for Kafka message consumption.
RabbitMQ is an open-source message broker that supports multiple messaging protocols. It is widely used in enterprise systems for message queuing.
RabbitMQ is used for asynchronous processing, task queues, and decoupling microservices.
First, add the dependencies for RabbitMQ in your pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Producer Configuration:
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue_name", false);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange_name");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("routing_key");
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
Producer Code:
@Service
public class RabbitProducer {
private final AmqpTemplate amqpTemplate;
@Autowired
public RabbitProducer(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void sendMessage(String message) {
amqpTemplate.convertAndSend("exchange_name", "routing_key", message);
}
}
Consumer Code:
@Service
public class RabbitConsumer {
@RabbitListener(queues = "queue_name")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
Use Case: A background task processing system where tasks are queued and processed by worker services. RabbitMQ is used to distribute the tasks among different worker services asynchronously.
Concepts:
Rules & Points:
Spring Boot Project (RabbitMQ)
pom.xml
):
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. Configuration (RabbitMQConfig.java
):
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue() {
return new Queue("task_queue", true);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("task_exchange");
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("task.#");
}
@Bean
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(messageListener);
container.setQueueNames("task_queue");
return container;
}
}
3. Producer (REST Controller) (TaskController.java
):
@RestController
@RequestMapping("/task")
public class TaskController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public ResponseEntity<String> sendTask(@RequestBody Task task) {
rabbitTemplate.convertAndSend("task_exchange", "task.new", task);
return ResponseEntity.ok("Task sent to the queue");
}
}
4. Consumer (TaskListener.java
):
@Component
public class TaskListener {
@RabbitListener(queues = "task_queue")
public void receiveMessage(Task task) {
System.out.println("Received Task: " + task);
// Process task asynchronously
}
}
5. Task Model (Task.java
):
public class Task {
private String name;
private String description;
// Getters and Setters
}
application.properties
For RabbitMQ, you need to configure the connection to the RabbitMQ server and set up certain properties for message queues.
# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=5 # Number of consumer threads
spring.rabbitmq.listener.simple.max-concurrency=10 # Max number of consumer threads
spring.rabbitmq.listener.simple.prefetch=10 # Prefetch count (messages pulled at once)
# Optional configurations for RabbitMQ exchange and queues
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.host
: The host where RabbitMQ server is running.spring.rabbitmq.port
: The port (default is 5672).spring.rabbitmq.username
/ password
: Credentials for connecting to RabbitMQ (default is guest/guest).spring.rabbitmq.listener.simple.concurrency
: Number of consumers to start in parallel.spring.rabbitmq.listener.simple.max-concurrency
: Max number of consumers.spring.rabbitmq.listener.simple.prefetch
: Controls how many messages the consumer can fetch at once.
IBM MQ is an enterprise messaging system known for its reliability and high performance. It provides point-to-point and publish-subscribe messaging models.
IBM MQ is often used in financial services, healthcare, and large enterprises for secure, reliable messaging.
First, include the necessary dependency:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.3.0.0</version>
</dependency>
Producer Configuration:
@Configuration
public class IBMQueueConfig {
@Bean
public MQQueueConnectionFactory connectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName("localhost");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("DEV.APP.SVRCONN");
return factory;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
}
Producer Code:
@Service
public class IBMMQProducer {
private final JmsTemplate jmsTemplate;
@Autowired
public IBMMQProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(String message) {
jmsTemplate.convertAndSend("queue_name", message);
}
}
Consumer Code:
@Service
public class IBMMQConsumer {
@JmsListener(destination = "queue_name")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
application.properties
For IBM MQ, you need to provide connection details for the MQ server, along with security and queue configurations.
# IBM MQ Configuration
ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password
# Connection factory and other settings
spring.jms.pub-sub-domain=false # Set to true for topic-based messaging
spring.jms.listener.concurrency=5 # Number of listeners
ibm.mq.queueManager
: The name of the IBM MQ Queue Manager.ibm.mq.channel
: The MQ channel for connecting to the Queue Manager.ibm.mq.host
: The host where IBM MQ is running (usually localhost
).ibm.mq.port
: The port on which the MQ server listens (default is 1414
).ibm.mq.queueName
: The name of the queue to which messages are sent or from which they are consumed.ibm.mq.sslEnabled
: Whether SSL encryption should be used for MQ connections.ibm.mq.username
/ ibm.mq.password
: Credentials for connecting to IBM MQ.spring.jms.pub-sub-domain
: Set to true
if using topics (for pub/sub) and false
for queues (for point-to-point).spring.jms.listener.concurrency
: Number of concurrent message listeners.
application-dev.properties
, application-prod.properties
, etc., in Spring Boot.
Event-driven architecture (EDA) involves components that communicate by sending and receiving events, enabling decoupled systems. In Spring Boot, this can be achieved using messaging systems like Kafka, RabbitMQ, or JMS.
Event-driven systems are perfect for microservices architectures, enabling asynchronous communication, real-time processing, and decoupling of services.
public class OrderCreatedEvent {
private String orderId;
private String customerName;
// Getters and setters
}
@Service
public class OrderService {
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
@Autowired
public OrderService(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void createOrder(OrderCreatedEvent orderEvent) {
kafkaTemplate.send("order_topic", orderEvent);
}
}
Event Consumer:
@Service
public class OrderEventListener {
@KafkaListener(topics = "order_topic", groupId = "order_group")
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
System.out.println("Order created: " + event.getOrderId());
}
}
Event-Driven Architecture (EDA) is an architectural pattern that promotes the decoupling of microservices through asynchronous communication, where services communicate by producing and consuming events rather than making direct synchronous calls to each other. This pattern offers several advantages in microservices systems:
We'll implement a simple Order Service that sends an OrderPlaced event whenever an order is placed. The Inventory Service and Payment Service will listen to these events to perform related tasks (e.g., updating inventory and processing payment).
The key concepts are:
OrderPlaced
).
pom.xml
.<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
Event Model (OrderPlacedEvent.java
):
public class OrderPlacedEvent {
private String orderId;
private String customerName;
private double totalAmount;
// Getters and Setters
}
RabbitMQ Configuration (RabbitMQConfig.java
):
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
return new Queue("orderQueue", true);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("orderExchange");
}
@Bean
public Binding binding(Queue orderQueue, DirectExchange exchange) {
return BindingBuilder.bind(orderQueue).to(exchange).with("order.key");
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
rabbitTemplate.convertAndSend("orderExchange", "order.key", event);
return ResponseEntity.ok("Order placed and event sent to RabbitMQ");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@RabbitListener(queues = "orderQueue")
public void handleOrderPlacedEvent(OrderPlacedEvent event) {
System.out.println("Order placed: " + event.getOrderId());
// Process the order (e.g., update inventory, process payment)
}
}
application.properties
:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
pom.xml
.<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
Kafka Configuration (KafkaConfig.java
):
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
}
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentMessageListenerContainer<String, OrderPlacedEvent> listenerContainer() {
return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
kafkaTemplate.send("orderTopic", event);
return ResponseEntity.ok("Order placed and event sent to Kafka");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@KafkaListener(topics = "orderTopic", groupId = "orderGroup")
public void handleOrderPlacedEvent(OrderPlacedEvent event) {
System.out.println("Order placed: " + event.getOrderId());
// Process the order (e.g., update inventory, process payment)
}
}
application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orderGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
pom.xml
.
<dependencies>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
IBM MQ Configuration (IBM_MQConfig.java
):
@Configuration
public class IBM_MQConfig {
@Value("${ibm.mq.queueManager}")
private String queueManager;
@Value("${ibm.mq.channel}")
private String channel;
@Value("${ibm.mq.host}")
private String host;
@Value("${ibm.mq.port}")
private int port;
@Value("${ibm.mq.queueName}")
private String queueName;
@Bean
public ConnectionFactory connectionFactory() {
MQQueueManagerFactory factory = new MQQueueManagerFactory();
factory.setQueueManager(queueManager);
factory.setChannel(channel);
factory.setHost(host);
factory.setPort(port);
return factory.create();
}
@Bean
public Queue queue() {
return new MQQueue(queueName);
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private JmsTemplate jmsTemplate;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
jmsTemplate.convertAndSend("TransactionQueue", event);
return ResponseEntity.ok("Order placed and event sent to IBM MQ");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@JmsListener(destination = "TransactionQueue")
public void handleOrderPlacedEvent(OrderPlacedEvent event) {
System.out.println("Order placed: " + event.getOrderId());
// Process the order (e.g., update inventory, process payment)
}
}
application.properties
:
ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password
Each messaging system (RabbitMQ, Kafka, IBM MQ) enables Event-Driven Architecture and allows microservices to communicate asynchronously through events. The implementation process involves:
There are several other ways to implement Event-Driven Architecture (EDA) in microservices. The most common approaches include:
In this architecture, you define a topic or channel that producers send messages to, and consumers can subscribe to the topic. This is common in RabbitMQ (with exchanges and queues), Kafka, and even Google Cloud Pub/Sub.
pom.xml
.<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Pub/Sub Configuration (PubSubConfig.java
):
@Configuration
public class PubSubConfig {
@Value("${google.cloud.project-id}")
private String projectId;
@Bean
public Publisher publisher() throws Exception {
return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
}
@Bean
public Subscriber subscriber() {
return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}).build();
}
}
@Configuration
public class PubSubConfig {
@Value("${google.cloud.project-id}")
private String projectId;
@Bean
public Publisher publisher() throws Exception {
return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
}
@Bean
public Subscriber subscriber() {
return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}).build();
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private Publisher publisher;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws Exception {
String message = new ObjectMapper().writeValueAsString(event);
publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build());
return ResponseEntity.ok("Order placed and event sent to Google Pub/Sub");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@Autowired
private Subscriber subscriber;
@PostConstruct
public void init() {
subscriber.startAsync();
}
}
application.properties
:
google.cloud.project-id=your-google-cloud-project-id
Amazon Web Services (AWS) provides Simple Queue Service (SQS) for message queuing, which is fully managed and highly scalable. This can be used in a similar way as RabbitMQ, Kafka, or Pub/Sub for event-driven communication.
pom.xml
.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws-messaging</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
SQS Configuration (SQSConfig.java
):
@Configuration
public class SQSConfig {
@Bean
public Queue queue() {
return new Queue("orderQueue", true);
}
@Bean
public AmazonSQSAsync amazonSQS() {
return AmazonSQSAsyncClient.builder().build();
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQS, Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAmazonSqs(amazonSQS);
container.setQueueNames(queue.getName());
container.setMessageHandler(new MessageHandler());
return container;
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private AmazonSQSAsync amazonSQS;
@Value("${aws.sqs.queue-url}")
private String queueUrl;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
String message = new ObjectMapper().writeValueAsString(event);
SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, message);
amazonSQS.sendMessage(sendMessageRequest);
return ResponseEntity.ok("Order placed and event sent to SQS");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@SqsListener(value = "orderQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handleOrderPlacedEvent(String message) {
OrderPlacedEvent event = new ObjectMapper().readValue(message, OrderPlacedEvent.class);
System.out.println("Order placed: " + event.getOrderId());
// Process the order
}
}
application.properties
:
aws.sqs.queue-url=https://sqs.us-east-1.amazonaws.com/your-account-id/your-queue-name
Apache Pulsar is an open-source, distributed messaging and streaming platform, similar to Kafka but with more focus on multi-tenancy, geo-replication, and long-term storage. Pulsar supports both publish-subscribe and queue-based messaging models.
pom.xml
.
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Pulsar Configuration (PulsarConfig.java
):
@Configuration
public class PulsarConfig {
@Value("${pulsar.service-url}")
private String pulsarServiceUrl;
@Bean
public PulsarClient pulsarClient() throws PulsarClientException {
return PulsarClient.builder().serviceUrl(pulsarServiceUrl).build();
}
@Bean
public Consumer<byte[]> orderConsumer(PulsarClient pulsarClient) throws PulsarClientException {
return pulsarClient.newConsumer()
.topic("order-topic")
.subscriptionName("order-subscription")
.subscribe();
}
}
Order Producer (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private PulsarClient pulsarClient;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws PulsarClientException {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("order-topic")
.create();
String message = new ObjectMapper().writeValueAsString(event);
producer.send(message.getBytes());
return ResponseEntity.ok("Order placed and event sent to Pulsar");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@Autowired
private Consumer<byte[]> orderConsumer;
@PostConstruct
public void consume() throws PulsarClientException {
while (true) {
Message<byte[]> msg = orderConsumer.receive();
String messageContent = new String(msg.getData());
OrderPlacedEvent event = new ObjectMapper().readValue(messageContent, OrderPlacedEvent.class);
System.out.println("Order placed: " + event.getOrderId());
// Process the order
}
}
}
application.properties
:
pulsar.service-url=pulsar://localhost:6650
Another approach in Event-Driven Architecture is Event Sourcing, which involves persisting the state changes as events rather than storing just the current state. This is typically combined with CQRS (Command Query Responsibility Segregation), where you separate the handling of commands (state changes) and queries (reads).