I'm always excited to take on new projects and collaborate with innovative minds.

Email

contact@niteshsynergy.com

Website

https://www.niteshsynergy.com/

Kafka

RabbitMQ, Kafka, and IBM MQ are three popular messaging systems, but they differ in terms of their design, use cases, and key features.

What is Apache Kafka? 

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission critical applications.

Apache Kafka Core Concepts We will discuss following Apache Kafka core concepts: 

1. Kafka Cluster

 2. Kafka Broker

 3. Kafka Producer 

4. Kafka Consumer 

5. Kafka Topic 

6. Kafka Partitions 

7. Kafka Offsets 

8. Kafka Consumer Group Let's begin with Kafka cluster.

 

1. Kafka Cluster Since Kafka is a distributed system, it acts as a cluster. A Kafka cluster consists of a set of brokers. A cluster has a minimum of 3 brokers.

 

The following diagram shows Kafka cluster with three Kafka brockers: 

image-160.png

 

2. Kafka Broker

 The broker is the Kafka server. It's just a meaningful name given to the Kafka server. And this name makes sense as well because all that Kafka does is act as a message broker between producer and consumer. The producer and consumer don't interact directly. They use the Kafka server as an agent or a broker to exchange messages. 

The following diagram shows a Kafka broker, it acts as an agent or broker to exchange messages between Producer and Consumer:

image-161.png

 

3. Kafka Producer 

Producer is an application that sends messages. It does not send messages directly to the recipient. It sends messages only to the Kafka server. 

 

The following diagram shows Producer sends messages directly to Kafka broker:

image-162.png

4. Kafka Consumer

 Consumer is an application that reads messages from the Kafka server. 

If producers are sending data, they must be sending it to someone, right? 

The consumers are the recipients. But remember that the producers don't send data to a recipient address.

 They just send it to the Kafka server. Anyone who is interested in that data can come forward and take it from the Kafka server.

 So, any application that requests data from a Kafka server is a consumer, and they can ask for data sent by any producer provided they have permission to read it.

The following diagram shows Producer sends messages directly to the Kafka broker and the Consumer consumes or reads messages from the Kafka broker:

image-163.png

5. Kafka Topic

 We learned that producer sends data to the Kafka broker. Then a consumer can ask for data from the Kafka broker. 

But the question is, Which data? We need to have some identification mechanism to request data from a broker.

 There comes the Kafka topic. 

• Topic is like a table in a database or folder in a file system. 

• Topic is identified by a name.

 • You can have any number of topics. 

The following diagram shows two Topics are created in a Kafka broker:

image-164.png

6. Kafka Partitions 

Kafka topics are divided into a number of partitions, which contain records in an unchangeable sequence.

 Kafka Brokers will store messages for a topic. But the capacity of data can be enormous and it may not be possible to store in a single computer. 

Therefore it will be partitioned into multiple parts and distributed among multiple computers since Kafka is a distributed system. 

The following diagram shows Kafka's topic is further divided into a number of partitions

image-165.png

7. Kafka Offsets 

Offset is a sequence of ids given to messages as they arrive at a partition. Once the offset is assigned it will never be changed. The first message gets an offset zero. 

The next message receives an offset one and so on.

image-167.png

8. Kafka Consumer Group

 A consumer group contains one or more consumers working together to process the messages.

 

image-168.png

 


Concepts of Apache Kafka and Messaging Queues

  1. Apache Kafka: A distributed event streaming platform designed for high-throughput, fault-tolerant messaging between systems.
  2. Messaging Queue: A mechanism for communication between services or applications using a message broker.
  3. Core Kafka Concepts:
    • Producer: Publishes messages to topics.
    • Consumer: Subscribes to topics and processes messages.
    • Broker: A Kafka server that stores and distributes messages.
    • Topic: Logical channels for organizing messages.
    • Partition: A subset of a topic for parallelism.
    • Offset: Position of a message in a partition.

Use Cases

  1. Real-Time Game Ecosystem (e.g., Soccer Game):
    • Producers: Game events (goals, fouls, substitutions).
    • Consumers: Fans' dashboards, analytics services, and notifications.
    • Kafka enables real-time event updates to fans and analytics for decision-making.
  2. Payment Systems (e.g., HDFC Bank):
    • Producers: Payment gateways and transaction services.
    • Consumers: Fraud detection systems, account ledger services.
    • Kafka ensures secure and consistent processing of payment events and allows auditing.

Approach

  1. Game Ecosystem:
    • Topic: game-events
    • Producer: Publishes game updates (e.g., Goal scored by Player X).
    • Consumer: Updates dashboards in real-time for users.
  2. Payment Systems:
    • Topic: transaction-events
    • Producer: Publishes transaction details.
    • Consumer: Processes transaction, updates the ledger, and triggers fraud detection.

 

image-169.png

The Spring Team provides Spring for Apache Kafka dependency to work with the development of Kafka-based messaging solutions.

image-170.png
image-171.png

Add dependencies:

org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-web   org.springframework.kafka spring-kafka org.springframework.boot spring-boot-starter-web   org.springframework.kafka spring-kafka

image-172.png

 

image-173.png

 

 

spring.kafka.consumer.bootstrap-servers: localhost:9092

 spring.kafka.consumer.group-id: group-id 

spring.kafka.consumer.auto-offset-reset: earliest 

spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.bootstrap-servers: localhost:9092 

spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer 

spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

Let's understand the above spring boot provided Kafka properties:

 

spring.kafka.consumer.group-id - specifies a unique string that identifies the consumer group this consumer belongs to.

 spring.kafka.consumer.auto-offset-reset property - specifies what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (e.g. because that data has been deleted): 

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw an exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw an exception to the consumer

spring.kafka.consumer.key-deserializer - specifies the deserializer class for keys.

spring.kafka.consumer.value-deserializer - specifies the deserializer class for values.

spring.kafka.producer.key-deserializer - specifies the serializer class for keys.

spring.kafka.producer.value-deserializer - specifies the serializer class for values.

 

4. Create Kafka Topic To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. 

package net.niteshsynergy.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

   @Bean
   public NewTopic niteshSynergyTopic() {
       return TopicBuilder.name("niteshsynergy")
               .build();
   }
}
 

5. Create Kafka Producer Creating a producer will write our messages on the topic.

Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.

package net.niteshsynergy.springbootkafka.kafka;

import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;

   public void sendMessage(String message) {
       LOGGER.info(String.format("Message sent -> %s", message));
       kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
   }
}
 

Create a utils package and within this package create AppConstants with the following content:

package net.niteshsynergy.springbootkafka.utils;

public class AppConstants {
   public static final String TOPIC_NAME = "niteshsynergy";
   public static final String GROUP_ID = "group_id";
}
 

KafKaProducer class uses KafkaTemplate to send messages to the configured topic name.

6. Create REST API to Send Message Create controller package, within controller package create KafkaProducerController with the following content to it:

package net.niteshsynergy.springbootkafka;

import net.niteshsynergy.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

   private KafkaProducer kafkaProducer;

   public KafkaProducerController(KafkaProducer kafkaProducer) {
       this.kafkaProducer = kafkaProducer;
   }

   @GetMapping("/publish")
   public ResponseEntity<String> publish(@RequestParam("message") String message) {
       kafkaProducer.sendMessage(message);
       return ResponseEntity.ok("Message sent to Kafka topic");
   }
}
 

See Topic Messages via Command Line: bin/kafka-console-consumer.sh --topic niteshsynergy--from-beginning --bootstrapserver localhost:9092

Make sure to change the topic name. In our case "niteshsynergy" is the topic name. 

7. Create Kafka Consumer Kafka Consumer is  the service that will be responsible for reading messages and processing them according to the needs of your own business logic. To set it up, enter the following:

 

package net.niteshsynergy.springbootkafka.kafka;

import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

   @KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
   public void consume(String message) {
       LOGGER.info(String.format("Message received -> %s", message));
   }
}
 

Here, we told our method void to consume (String message) to subscribe to the user’s topic and just emit every message to the application log. In your real application, you can handle messages the way your business requires you to. KafkaListener endpoint:

 

@KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
public void consume(String message) {
   LOGGER.info(String.format("Message received -> %s", message));
}
 

Let's run the Spring boot application and have the demo. Make sure that Zookeeper and Kafka services should be up and running. 

Open a browser and hit the below link to call a REST API: http://localhost:8080/api/v1/kafka/publish?message=hello%20world

 

_______________________________________________________________________________________________________________________________

Spring Boot Kafka JsonSerializer and JsonDeserializer Example this tutorial, we will learn how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.

 If you are new to Apache Kafka then you should check out my article - Apache Kafka Core Concepts. Well, basically you will learn How to send and receive a Java Object as a JSON byte[] to and from Apache Kafka.

 Apache Kafka stores and transports byte[]. There are a number of built-in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON. 

We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterward, we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.

image-175.png

 

image-176.png


 

 

Add dependencies:

org.springframework.boot spring-boot-starter-web   org.springframework.kafka spring-kafka

image-177.png

 

image-178.png

 

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
 

We are using the following Consumer property to convert JSON into Java object:

spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

 

We are using the following Producer property to convert Java object into JSON:

spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

 

Let's understand the meaning of the above properties. 

spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers. 

spring.kafka.consumer.group-id - A unique string that identifies the consumer group to which this consumer belongs.

spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server. 

spring.kafka.consumer.key-deserializer - Deserializer class for keys. 

spring.kafka.consumer.value-deserializer - Deserializer class for values. 

spring.kafka.producer.key-serializer - Serializer class for keys. 

spring.kafka.producer.value-serializer - Serializer class for values.

 

4. Create Kafka Topic To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "niteshsynergy" in this example. Let's create a KafkaTopicConfig file and add the following content:

 

package net.niteshsynergy.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

   @Bean
   public NewTopic niteshsynergyTopic() {
       return TopicBuilder.name("niteshsynergy")
                          .build();
   }
}
 

5. Create Simple POJO to Serialize / Deserialize Let's create a User class to send and receive a User object to and from a Kafka topic. Well, the User instance will be serialized by JsonSerializer to a byte array. 

Kafka finally stores this byte array into the given partition of the particular topic. During deserialization, JsonDeserializer is used to receive JSON from Kafka as a byte array, convert it JSON byte array to the User object, and return it to the application.

 

 

package net.niteshsynergy.springbootkafka.payload;

public class User {
   private int id;
   private String firstName;
   private String lastName;

   // Getters and Setters
   public int getId() {
       return id;
   }

   public void setId(int id) {
       this.id = id;
   }

   public String getFirstName() {
       return firstName;
   }

   public void setFirstName(String firstName) {
       this.firstName = firstName;
   }

   public String getLastName() {
       return lastName;
   }

   public void setLastName(String lastName) {
       this.lastName = lastName;
   }

   // toString Method for easy representation
   @Override
   public String toString() {
       return "User{" +
               "id=" + id +
               ", firstName='" + firstName + '\'' +
               ", lastName='" + lastName + '\'' +
               '}';
   }
}
 

6. Create Kafka Producer to Produce JSON Message Let's create Kafka Producer to Produce JSON Messages using Spring Kafka. KafkaTemplate Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.

 

package net.niteshsynergy.springbootkafka.kafka;

import net.niteshsynergy.springbootkafka.payload.User;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

   @Autowired
   private KafkaTemplate<String, User> kafkaTemplate;

   public void sendMessage(User data) {
       LOGGER.info(String.format("Message sent -> %s", data.toString()));

       // Create message with payload and topic header
       Message<User> message = MessageBuilder
               .withPayload(data)
               .setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
               .build();

       // Send the message to Kafka topic
       kafkaTemplate.send(message);
   }
}
 

Let’s start by sending a User object to a Kafka Topic. 

Notice: we created a KafkaTemplate since we are sending Java Objects to the Kafka topic that’ll automatically be transformed into a JSON byte[]. In this example, we created a Message using the MessageBuilder.

 It’s important to add the topic to which we are going to send the message too.

 

7. Create REST API to Send JSON Object Let's create a simple POST REST API to send User information as a JSON object.

Instead of sending a message string, we will create a POST REST API to post a complete User object as a JSON so that the Kafka producer can able to write the User object to the Kafka topic.

 

package net.niteshsynergy.springbootkafka.controller;

import net.niteshsynergy.springbootkafka.kafka.KafkaProducer;
import net.niteshsynergy.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

   private final KafkaProducer kafkaProducer;

   // Constructor injection of KafkaProducer
   public KafkaProducerController(KafkaProducer kafkaProducer) {
       this.kafkaProducer = kafkaProducer;
   }

   // Endpoint to publish a User message to Kafka
   @PostMapping("/publish")
   public ResponseEntity<String> publish(@RequestBody User user) {
       // Send the User object to Kafka
       kafkaProducer.sendMessage(user);
       return ResponseEntity.ok("Message sent to kafka topic");
   }
}
 

8. Create Kafka Consumer to Consume JSON Message Let's create a Kafka Consumer to receive JSON messages from the topic. In KafkaConsumer we simply need to add the User Java Object as a parameter in our method.

 

package net.niteshsynergy.springbootkafka.kafka;

import net.niteshsynergy.springbootkafka.payload.User;
import net.niteshsynergy.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

   private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

   // Kafka listener to consume messages from the topic
   @KafkaListener(topics = AppConstants.TOPIC_NAME, groupId = AppConstants.GROUP_ID)
   public void consume(User data) {
       LOGGER.info(String.format("Message received -> %s", data.toString()));
   }
}
 

9. Demo Let's run the Spring boot application and have the demo.

 Make sure that Zookeeper and Kafka services should be up and running.

 Let's use the Postman client to make a POST REST API call:

 

 

Apache Kafka USE Case
 

 

Apache Kafka

  • Type: Distributed Streaming Platform
  • Message Protocol: Kafka protocol
  • Message Delivery: Primarily designed for publish-subscribe with strong durability guarantees.
  • Use Case: Best suited for stream processing, event sourcing, and log aggregation. It handles high-throughput, real-time data feeds effectively, making it popular for data lakes and real-time analytics.
  • Strengths:
    • Extremely high throughput (millions of messages per second).
    • Horizontal scalability with built-in partitioning.
    • Strong durability and fault tolerance (message persistence to disk).
    • Ideal for event-driven architectures and log-based systems.
  • Scalability: Excellent horizontal scalability. Kafka is built to handle large volumes of data and scale horizontally with minimal impact on performance.
  • Fault Tolerance: Replication of data across multiple nodes ensures high availability.
  • Performance: Very high throughput and low latency for large-scale applications. Kafka handles high volumes of messages effectively, making it ideal for large-scale, real-time data pipelines.
  • Drawbacks:
    • More complex to set up and manage.
    • Not as flexible as RabbitMQ for traditional messaging patterns like complex routing or request-response.
Producer Configuration:

Kafka producers send messages to a Kafka topic.
@Configuration
public class KafkaProducerConfig {
   @Bean
   public ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(producerConfigs());
   }

   @Bean
   public KafkaTemplate<String, String> kafkaTemplate() {
       return new KafkaTemplate<>(producerFactory());
   }

   private Map<String, Object> producerConfigs() {
       Map<String, Object> configs = new HashMap<>();
       configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return configs;
   }
}
 

Producer Code:

@Service
public class KafkaProducer {
   private final KafkaTemplate<String, String> kafkaTemplate;

   @Autowired
   public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
       this.kafkaTemplate = kafkaTemplate;
   }

   public void sendMessage(String message) {
       kafkaTemplate.send("topic_name", message);
   }
}
 

Consumer Configuration:

Kafka consumers receive messages from a Kafka topic.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }

   @Bean
   public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
       return new ConcurrentMessageListenerContainer<>(consumerFactory());
   }

   private Map<String, Object> consumerConfigs() {
       Map<String, Object> configs = new HashMap<>();
       configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
       configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       return configs;
   }
}
 

Consumer Code:

 

@Service
public class KafkaConsumer {
   @KafkaListener(topics = "topic_name", groupId = "group_id")
   public void listen(String message) {
       System.out.println("Received Message: " + message);
   }
}
 

Kafka Use Case: Real-Time Event Processing

Use Case: A real-time event streaming system where events (e.g., user activity logs, sensor data, etc.) are produced and consumed by different services for processing and analysis.

Concepts:

  • Producer: Sends messages to Kafka topics.
  • Consumer: Reads messages from Kafka topics.
  • Topic: A category to which messages are published.
  • Partition: Kafka topics are divided into partitions for scalability.
  • Consumer Group: Multiple consumers can join a group to share the load of reading messages from a topic.

Rules & Points:

  • Kafka is designed for high throughput and fault tolerance.
  • Supports distributed, horizontal scaling through topic partitioning.
  • Guarantees delivery through replication and partitioning.
  • Ideal for event-driven architectures and stream processing.

     

Spring Boot Project (Kafka)

  1. Add dependencies (pom.xml):

<dependencies>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

2. Configuration (KafkaConfig.java):

@Configuration
@EnableKafka
public class KafkaConfig {

   @Bean
   public ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(producerConfig());
   }

   @Bean
   public KafkaTemplate<String, String> kafkaTemplate() {
       return new KafkaTemplate<>(producerFactory());
   }

   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfig());
   }

   @Bean
   public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
       return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
   }
}
 

3. Producer (REST Controller) (EventController.java):

@RestController
@RequestMapping("/event")
public class EventController {

   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;

   @PostMapping("/send")
   public ResponseEntity<String> sendEvent(@RequestBody String eventMessage) {
       kafkaTemplate.send("event_topic", eventMessage);
       return ResponseEntity.ok("Event sent to Kafka");
   }
}
 

4.Consumer (EventListener.java):

@Component
public class EventListener {

   @KafkaListener(topics = "event_topic", groupId = "event_group")
   public void listenEvent(String eventMessage) {
       System.out.println("Received Event: " + eventMessage);
       // Process event message asynchronously
   }
}
 

5. Event Model (Event.java):

public class Event {
   private String message;
   private Date timestamp;

   // Getters and Setters
}
 

 Kafka Configuration in application.properties

For Kafka, you'll need to specify the Kafka broker, topics, consumer group, and other configurations.

# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=event_group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# Additional configurations
spring.kafka.listener.concurrency=3 # Number of consumer threads
spring.kafka.listener.ack-mode=manual # Manual acknowledgment for fine-grained control
spring.kafka.consumer.max-poll-records=10 # Max records pulled per poll
 

Explanation:

  • spring.kafka.bootstrap-servers: Comma-separated list of Kafka brokers to connect to.
  • spring.kafka.consumer.group-id: Kafka consumer group ID.
  • spring.kafka.consumer.auto-offset-reset: How Kafka should handle the offset when consuming a topic. earliest means starting from the earliest available message.
  • spring.kafka.consumer.key-deserializer / value-deserializer: Specifies the deserializers for the consumer to convert bytes into Java objects.
  • spring.kafka.producer.key-serializer / value-serializer: Specifies the serializers for the producer.
  • spring.kafka.listener.concurrency: Number of consumers that will handle partitions.
  • spring.kafka.listener.ack-mode: Acknowledgment mode for Kafka message consumption.

 

 

2. RabbitMQ Messaging with Spring Boot

Overview:

RabbitMQ is an open-source message broker that supports multiple messaging protocols. It is widely used in enterprise systems for message queuing.

Use Case:

RabbitMQ is used for asynchronous processing, task queues, and decoupling microservices.

 

RabbitMQ

  • Type: Message Broker (AMQP-based)
  • Message Protocol: Advanced Message Queuing Protocol (AMQP)
  • Message Delivery: Supports both point-to-point and publish/subscribe patterns.
  • Use Case: Best for applications requiring complex routing, low-latency delivery, and easy integration. Ideal for scenarios like microservices, task queues, and backend communication.
  • Strengths:
    • High reliability and flexible message routing.
    • Supports complex messaging patterns (e.g., routing, topics).
    • Easy to set up and manage.
    • Strong community and documentation.
  • Scalability: Can scale horizontally using clustering, though it may face challenges with very high throughput.
  • Fault Tolerance: Provides message persistence, replication, and clustering for high availability.
  • Performance: Suitable for workloads with moderate message rates. May not scale as well as Kafka for very high-throughput systems.

 

Spring Boot Integration:

First, add the dependencies for RabbitMQ in your pom.xml:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
 

 

Producer Configuration:

@Configuration
public class RabbitConfig {
   @Bean
   public Queue queue() {
       return new Queue("queue_name", false);
   }

   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("exchange_name");
   }

   @Bean
   public Binding binding() {
       return BindingBuilder.bind(queue()).to(exchange()).with("routing_key");
   }

   @Bean
   public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
       return new RabbitTemplate(connectionFactory);
   }
}
 

 

Producer Code:

 

@Service
public class RabbitProducer {
   private final AmqpTemplate amqpTemplate;

   @Autowired
   public RabbitProducer(AmqpTemplate amqpTemplate) {
       this.amqpTemplate = amqpTemplate;
   }

   public void sendMessage(String message) {
       amqpTemplate.convertAndSend("exchange_name", "routing_key", message);
   }
}
 

 

Consumer Code:

 

@Service
public class RabbitConsumer {
   @RabbitListener(queues = "queue_name")
   public void listen(String message) {
       System.out.println("Received Message: " + message);
   }
}
 

 

RabbitMQ Use Case: Task Queue System

Use Case: A background task processing system where tasks are queued and processed by worker services. RabbitMQ is used to distribute the tasks among different worker services asynchronously.

Concepts:

  • Queues: A message queue holds messages that are consumed by workers.
  • Exchanges: Routes messages to queues.
  • Bindings: A way to connect queues to exchanges.
  • Consumers: Workers that read and process messages from queues.
  • Producers: Components that send messages to RabbitMQ.

Rules & Points:

  • RabbitMQ uses the AMQP protocol.
  • Messages are guaranteed delivery (if configured for persistence).
  • It supports various exchange types like direct, topic, fanout, and headers-based exchanges.
  • It provides message acknowledgment and retries.

Spring Boot Project (RabbitMQ)

  1. Add dependencies (pom.xml):

 

<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

 

2. Configuration (RabbitMQConfig.java):

@Configuration
public class RabbitMQConfig {

   @Bean
   public Queue queue() {
       return new Queue("task_queue", true);
   }

   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("task_exchange");
   }

   @Bean
   public Binding binding(Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("task.#");
   }

   @Bean
   public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) {
       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       container.setConnectionFactory(connectionFactory);
       container.setMessageListener(messageListener);
       container.setQueueNames("task_queue");
       return container;
   }
}
 

3. Producer (REST Controller) (TaskController.java):

@RestController
@RequestMapping("/task")
public class TaskController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @PostMapping("/send")
   public ResponseEntity<String> sendTask(@RequestBody Task task) {
       rabbitTemplate.convertAndSend("task_exchange", "task.new", task);
       return ResponseEntity.ok("Task sent to the queue");
   }
}
 

4. Consumer (TaskListener.java):

@Component
public class TaskListener {

   @RabbitListener(queues = "task_queue")
   public void receiveMessage(Task task) {
       System.out.println("Received Task: " + task);
       // Process task asynchronously
   }
}
 

5. Task Model (Task.java):

public class Task {
   private String name;
   private String description;

   // Getters and Setters
}
 

RabbitMQ Configuration in application.properties

For RabbitMQ, you need to configure the connection to the RabbitMQ server and set up certain properties for message queues.

 

# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=5 # Number of consumer threads
spring.rabbitmq.listener.simple.max-concurrency=10 # Max number of consumer threads
spring.rabbitmq.listener.simple.prefetch=10 # Prefetch count (messages pulled at once)

# Optional configurations for RabbitMQ exchange and queues
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=false
 

Explanation:

  • spring.rabbitmq.host: The host where RabbitMQ server is running.
  • spring.rabbitmq.port: The port (default is 5672).
  • spring.rabbitmq.username / password: Credentials for connecting to RabbitMQ (default is guest/guest).
  • spring.rabbitmq.listener.simple.concurrency: Number of consumers to start in parallel.
  • spring.rabbitmq.listener.simple.max-concurrency: Max number of consumers.
  • spring.rabbitmq.listener.simple.prefetch: Controls how many messages the consumer can fetch at once.

 

3. IBM MQ Messaging with Spring Boot

Overview:

IBM MQ is an enterprise messaging system known for its reliability and high performance. It provides point-to-point and publish-subscribe messaging models.

Use Case:

IBM MQ is often used in financial services, healthcare, and large enterprises for secure, reliable messaging.

 

IBM MQ

  • Type: Enterprise Message Broker
  • Message Protocol: Proprietary IBM MQ protocol, with support for JMS and MQTT.
  • Message Delivery: Supports point-to-point and publish/subscribe patterns, along with transactional support.
  • Use Case: IBM MQ is often used in large enterprises with complex, high-security environments and mission-critical applications. It's a good fit for financial services, government, and healthcare industries.
  • Strengths:
    • High level of reliability, security, and enterprise-grade features.
    • Supports both synchronous and asynchronous messaging, including transactional messaging.
    • Robust security features and compliance with regulatory standards.
    • Integration with other IBM products (e.g., IBM Integration Bus).
  • Scalability: Can scale vertically and horizontally, but typically needs to be managed closely for large deployments.
  • Fault Tolerance: Advanced features like message persistence, high availability configurations, and automatic recovery.
  • Performance: Designed for low-latency, high-reliability messaging, though its throughput is typically lower than Kafka's at very high volumes.
  • Drawbacks:
    • Licensing and pricing can be expensive, particularly in large-scale environments.
    • More complex setup and higher operational overhead than RabbitMQ.

 

Spring Boot Integration:

First, include the necessary dependency:

<dependency>
   <groupId>com.ibm.mq</groupId>
   <artifactId>com.ibm.mq.allclient</artifactId>
   <version>9.3.0.0</version>
</dependency>
 

Producer Configuration:

@Configuration
public class IBMQueueConfig {
   @Bean
   public MQQueueConnectionFactory connectionFactory() throws JMSException {
       MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
       factory.setHostName("localhost");
       factory.setPort(1414);
       factory.setQueueManager("QM1");
       factory.setChannel("DEV.APP.SVRCONN");
       return factory;
   }

   @Bean
   public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
       return new JmsTemplate(connectionFactory);
   }
}
 

Producer Code:

@Service
public class IBMMQProducer {
   private final JmsTemplate jmsTemplate;

   @Autowired
   public IBMMQProducer(JmsTemplate jmsTemplate) {
       this.jmsTemplate = jmsTemplate;
   }

   public void sendMessage(String message) {
       jmsTemplate.convertAndSend("queue_name", message);
   }
}
 

Consumer Code:

@Service
public class IBMMQConsumer {
   @JmsListener(destination = "queue_name")
   public void listen(String message) {
       System.out.println("Received Message: " + message);
   }
}
 

 

3. IBM MQ Configuration in application.properties

For IBM MQ, you need to provide connection details for the MQ server, along with security and queue configurations.

# IBM MQ Configuration
ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password

# Connection factory and other settings
spring.jms.pub-sub-domain=false # Set to true for topic-based messaging
spring.jms.listener.concurrency=5 # Number of listeners
 

Explanation:

  • ibm.mq.queueManager: The name of the IBM MQ Queue Manager.
  • ibm.mq.channel: The MQ channel for connecting to the Queue Manager.
  • ibm.mq.host: The host where IBM MQ is running (usually localhost).
  • ibm.mq.port: The port on which the MQ server listens (default is 1414).
  • ibm.mq.queueName: The name of the queue to which messages are sent or from which they are consumed.
  • ibm.mq.sslEnabled: Whether SSL encryption should be used for MQ connections.
  • ibm.mq.username / ibm.mq.password: Credentials for connecting to IBM MQ.
  • spring.jms.pub-sub-domain: Set to true if using topics (for pub/sub) and false for queues (for point-to-point).
  • spring.jms.listener.concurrency: Number of concurrent message listeners.

 

General Notes on Configurations:

  • Security and Authentication: For all systems (RabbitMQ, Kafka, and IBM MQ), it’s important to configure secure connections (e.g., enabling SSL/TLS, configuring username/password, etc.) depending on the environment.
  • Environment-Specific Configurations: It’s often a good practice to separate configuration settings for development, testing, and production environments. You can use profiles such as application-dev.properties, application-prod.properties, etc., in Spring Boot.
  • Error Handling and Retries: Consider adding retry mechanisms and dead-letter queues for failed message deliveries, especially in production environments.

 

 

Comparison Summary:

FeatureRabbitMQKafkaIBM MQ
ProtocolAMQP, MQTT, STOMPKafka ProtocolIBM MQ Protocol, JMS, MQTT
Best forGeneral messaging, task queuesEvent streams, log aggregationEnterprise messaging, financial systems
Use CasesMicroservices, task queuesReal-time data pipelines, event-driven systemsEnterprise integration, secure and reliable messaging
ScalabilityModerate, clustered setupHigh scalability, partitionedModerate to high, complex setup
PerformanceModerate throughputHigh throughput, low latencyModerate, high reliability
ReliabilityGood, supports message persistenceHigh, built for fault toleranceVery high, designed for enterprise reliability
Ease of UseEasy to set up and manageComplex setup, but powerfulHigh operational overhead, enterprise-focused
Fault ToleranceGood, supports clustering and replicationExcellent, replication and partitioningAdvanced, high availability and persistence
CostOpen-source (with enterprise support options)Open-source (with Confluent support)Commercial, often costly for large-scale deployments

When to Choose Each:

  • RabbitMQ: Choose RabbitMQ for simpler messaging use cases with moderate throughput, especially when you need flexible message routing or integration with other services.
  • Kafka: Choose Kafka when you need to handle large volumes of real-time data, support high throughput, and focus on stream processing or event-driven architecture.
  • IBM MQ: Choose IBM MQ for large, enterprise-grade systems that require high levels of security, transactional messaging, and integration with other IBM products in industries like banking and healthcare.
 

 

4. Event-Driven Architecture in Spring Boot

Overview:

Event-driven architecture (EDA) involves components that communicate by sending and receiving events, enabling decoupled systems. In Spring Boot, this can be achieved using messaging systems like Kafka, RabbitMQ, or JMS.

Use Case:

Event-driven systems are perfect for microservices architectures, enabling asynchronous communication, real-time processing, and decoupling of services.

Example with Kafka:

Event Publisher:

public class OrderCreatedEvent {
   private String orderId;
   private String customerName;

   // Getters and setters
}

@Service
public class OrderService {
   private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

   @Autowired
   public OrderService(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
       this.kafkaTemplate = kafkaTemplate;
   }

   public void createOrder(OrderCreatedEvent orderEvent) {
       kafkaTemplate.send("order_topic", orderEvent);
   }
}
 

 

Event Consumer:

@Service
public class OrderEventListener {
   @KafkaListener(topics = "order_topic", groupId = "order_group")
   public void handleOrderCreatedEvent(OrderCreatedEvent event) {
       System.out.println("Order created: " + event.getOrderId());
   }
}
 

 

Why Event-Driven Architecture (EDA) is Needed in Microservices?

Event-Driven Architecture (EDA) is an architectural pattern that promotes the decoupling of microservices through asynchronous communication, where services communicate by producing and consuming events rather than making direct synchronous calls to each other. This pattern offers several advantages in microservices systems:

  1. Loose Coupling: Each microservice is independent and doesn't need to know about the internal workings of other services. This increases modularity and allows for easier scaling, maintenance, and independent deployment.
  2. Asynchronous Communication: Services interact asynchronously through events, which allows for non-blocking communication. This reduces the dependency on synchronous API calls that can cause delays or timeouts.
  3. Scalability: EDA allows for better scalability, as services can scale independently. The event bus (message broker) decouples services and allows them to handle different loads.
  4. Resilience: EDA improves system resilience, as it can handle failures gracefully. If a service goes down, messages can be stored and retried when the service comes back online, ensuring eventual consistency.
  5. Flexibility: Events can be processed by multiple consumers, enabling new services to join the system and react to events without changes to the existing services.

Example Implementation with Three Messaging Systems (RabbitMQ, Kafka, IBM MQ)

We'll implement a simple Order Service that sends an OrderPlaced event whenever an order is placed. The Inventory Service and Payment Service will listen to these events to perform related tasks (e.g., updating inventory and processing payment).

The key concepts are:

  • Producer: Sends events to the message broker.
  • Consumer: Listens to events from the message broker and reacts accordingly.
  • Event: Represents the action that occurred (e.g., OrderPlaced).

 

1. Event-Driven Architecture with RabbitMQ

Project Setup:

  1. Dependencies: Add the necessary RabbitMQ and Spring Boot dependencies to pom.xml.

<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

Event Model (OrderPlacedEvent.java):

public class OrderPlacedEvent {
   private String orderId;
   private String customerName;
   private double totalAmount;

   // Getters and Setters
}
 

RabbitMQ Configuration (RabbitMQConfig.java):

@Configuration
public class RabbitMQConfig {

   @Bean
   public Queue orderQueue() {
       return new Queue("orderQueue", true);
   }

   @Bean
   public DirectExchange exchange() {
       return new DirectExchange("orderExchange");
   }

   @Bean
   public Binding binding(Queue orderQueue, DirectExchange exchange) {
       return BindingBuilder.bind(orderQueue).to(exchange).with("order.key");
   }
}
 

 

Order Producer (Controller) (OrderController.java):

 

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       rabbitTemplate.convertAndSend("orderExchange", "order.key", event);
       return ResponseEntity.ok("Order placed and event sent to RabbitMQ");
   }
}


Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @RabbitListener(queues = "orderQueue")
   public void handleOrderPlacedEvent(OrderPlacedEvent event) {
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order (e.g., update inventory, process payment)
   }
}
 

application.properties:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
 

2. Event-Driven Architecture with Kafka

Project Setup:

  1. Dependencies: Add Kafka dependencies to pom.xml.

<dependencies>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

 

Kafka Configuration (KafkaConfig.java):

 

@Configuration
@EnableKafka
public class KafkaConfig {

   @Bean
   public KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate() {
       return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
   }

   @Bean
   public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfig());
   }

   @Bean
   public ConcurrentMessageListenerContainer<String, OrderPlacedEvent> listenerContainer() {
       return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
   }
}
 

Order Producer (Controller) (OrderController.java):

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       kafkaTemplate.send("orderTopic", event);
       return ResponseEntity.ok("Order placed and event sent to Kafka");
   }
}
 

 

Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @KafkaListener(topics = "orderTopic", groupId = "orderGroup")
   public void handleOrderPlacedEvent(OrderPlacedEvent event) {
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order (e.g., update inventory, process payment)
   }
}
 

 

application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orderGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 

3. Event-Driven Architecture with IBM MQ

Project Setup:

  1. Dependencies: Add the IBM MQ dependency in pom.xml.

 

<dependencies>
   <dependency>
       <groupId>com.ibm.mq</groupId>
       <artifactId>mq-jms-spring-boot-starter</artifactId>
       <version>3.0.0</version>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

 

IBM MQ Configuration (IBM_MQConfig.java):

 

@Configuration
public class IBM_MQConfig {

   @Value("${ibm.mq.queueManager}")
   private String queueManager;

   @Value("${ibm.mq.channel}")
   private String channel;

   @Value("${ibm.mq.host}")
   private String host;

   @Value("${ibm.mq.port}")
   private int port;

   @Value("${ibm.mq.queueName}")
   private String queueName;

   @Bean
   public ConnectionFactory connectionFactory() {
       MQQueueManagerFactory factory = new MQQueueManagerFactory();
       factory.setQueueManager(queueManager);
       factory.setChannel(channel);
       factory.setHost(host);
       factory.setPort(port);
       return factory.create();
   }

   @Bean
   public Queue queue() {
       return new MQQueue(queueName);
   }
}
 

 

Order Producer (Controller) (OrderController.java):

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private JmsTemplate jmsTemplate;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       jmsTemplate.convertAndSend("TransactionQueue", event);
       return ResponseEntity.ok("Order placed and event sent to IBM MQ");
   }
}
 

 

Order Consumer (OrderListener.java):

 

 

@Component
public class OrderListener {

   @JmsListener(destination = "TransactionQueue")
   public void handleOrderPlacedEvent(OrderPlacedEvent event) {
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order (e.g., update inventory, process payment)
   }
}
 

application.properties:

ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password
 

Each messaging system (RabbitMQ, Kafka, IBM MQ) enables Event-Driven Architecture and allows microservices to communicate asynchronously through events. The implementation process involves:

  1. Defining events that are passed between microservices.
  2. Producers that publish events to the messaging system.
  3. Consumers that listen for and react to those events.

 

There are several other ways to implement Event-Driven Architecture (EDA) in microservices. The most common approaches include:

  1. Using a Publish-Subscribe Model (Pub/Sub)
  2. Using Cloud-based Messaging Services (e.g., AWS SQS, Google Pub/Sub)
  3. Using Apache Pulsar
  4. Using Database Change Streams or Event Sourcing

 

1. Publish-Subscribe Model (Pub/Sub) with a Message Broker

In this architecture, you define a topic or channel that producers send messages to, and consumers can subscribe to the topic. This is common in RabbitMQ (with exchanges and queues), Kafka, and even Google Cloud Pub/Sub.

Example with Google Cloud Pub/Sub (A Cloud-based Message Broker)

Project Setup:

  1. Dependencies: Add the necessary Google Pub/Sub dependencies in pom.xml.

<dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-pubsub</artifactId>
   <version>2.12.1</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

Pub/Sub Configuration (PubSubConfig.java):

@Configuration
public class PubSubConfig {

   @Value("${google.cloud.project-id}")
   private String projectId;

   @Bean
   public Publisher publisher() throws Exception {
       return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
   }

   @Bean
   public Subscriber subscriber() {
       return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
           System.out.println("Received message: " + message.getData().toStringUtf8());
           consumer.ack();
       }).build();
   }
}
 

@Configuration
public class PubSubConfig {

   @Value("${google.cloud.project-id}")
   private String projectId;

   @Bean
   public Publisher publisher() throws Exception {
       return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
   }

   @Bean
   public Subscriber subscriber() {
       return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
           System.out.println("Received message: " + message.getData().toStringUtf8());
           consumer.ack();
       }).build();
   }
}
 

Order Producer (Controller) (OrderController.java):

 

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private Publisher publisher;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws Exception {
       String message = new ObjectMapper().writeValueAsString(event);
       publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build());
       return ResponseEntity.ok("Order placed and event sent to Google Pub/Sub");
   }
}
 

Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @Autowired
   private Subscriber subscriber;

   @PostConstruct
   public void init() {
       subscriber.startAsync();
   }
}
 

 

application.properties:

google.cloud.project-id=your-google-cloud-project-id
 

 

2. Using Cloud-based Messaging Services (e.g., AWS SQS)

Amazon Web Services (AWS) provides Simple Queue Service (SQS) for message queuing, which is fully managed and highly scalable. This can be used in a similar way as RabbitMQ, Kafka, or Pub/Sub for event-driven communication.

AWS SQS Example

Project Setup:

  1. Dependencies: Add the necessary dependencies to pom.xml.

 

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-aws-messaging</artifactId>
   <version>3.0.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

 

SQS Configuration (SQSConfig.java):

@Configuration
public class SQSConfig {

   @Bean
   public Queue queue() {
       return new Queue("orderQueue", true);
   }

   @Bean
   public AmazonSQSAsync amazonSQS() {
       return AmazonSQSAsyncClient.builder().build();
   }

   @Bean
   public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQS, Queue queue) {
       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       container.setAmazonSqs(amazonSQS);
       container.setQueueNames(queue.getName());
       container.setMessageHandler(new MessageHandler());
       return container;
   }
}
 

Order Producer (Controller) (OrderController.java):

 

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private AmazonSQSAsync amazonSQS;

   @Value("${aws.sqs.queue-url}")
   private String queueUrl;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       String message = new ObjectMapper().writeValueAsString(event);
       SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, message);
       amazonSQS.sendMessage(sendMessageRequest);
       return ResponseEntity.ok("Order placed and event sent to SQS");
   }
}
 

Order Consumer (OrderListener.java):

 

@Component
public class OrderListener {

   @SqsListener(value = "orderQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
   public void handleOrderPlacedEvent(String message) {
       OrderPlacedEvent event = new ObjectMapper().readValue(message, OrderPlacedEvent.class);
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order
   }
}
 

 

application.properties:

 

aws.sqs.queue-url=https://sqs.us-east-1.amazonaws.com/your-account-id/your-queue-name
 

 

3. Apache Pulsar for Event-Driven Microservices

Apache Pulsar is an open-source, distributed messaging and streaming platform, similar to Kafka but with more focus on multi-tenancy, geo-replication, and long-term storage. Pulsar supports both publish-subscribe and queue-based messaging models.

Pulsar Example

  1. Dependencies: Add Pulsar dependencies in pom.xml.

 

<dependency>
   <groupId>org.apache.pulsar</groupId>
   <artifactId>pulsar-client-api</artifactId>
   <version>2.11.1</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

 

Pulsar Configuration (PulsarConfig.java):

 

@Configuration
public class PulsarConfig {

   @Value("${pulsar.service-url}")
   private String pulsarServiceUrl;

   @Bean
   public PulsarClient pulsarClient() throws PulsarClientException {
       return PulsarClient.builder().serviceUrl(pulsarServiceUrl).build();
   }

   @Bean
   public Consumer<byte[]> orderConsumer(PulsarClient pulsarClient) throws PulsarClientException {
       return pulsarClient.newConsumer()
                          .topic("order-topic")
                          .subscriptionName("order-subscription")
                          .subscribe();
   }
}
 

 

Order Producer (OrderController.java):

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private PulsarClient pulsarClient;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws PulsarClientException {
       Producer<byte[]> producer = pulsarClient.newProducer()
                                              .topic("order-topic")
                                              .create();
       String message = new ObjectMapper().writeValueAsString(event);
       producer.send(message.getBytes());
       return ResponseEntity.ok("Order placed and event sent to Pulsar");
   }
}
 

Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @Autowired
   private Consumer<byte[]> orderConsumer;

   @PostConstruct
   public void consume() throws PulsarClientException {
       while (true) {
           Message<byte[]> msg = orderConsumer.receive();
           String messageContent = new String(msg.getData());
           OrderPlacedEvent event = new ObjectMapper().readValue(messageContent, OrderPlacedEvent.class);
           System.out.println("Order placed: " + event.getOrderId());
           // Process the order
       }
   }
}
 

 

application.properties:

pulsar.service-url=pulsar://localhost:6650
 

 

4. Using Event Sourcing (CQRS)

Another approach in Event-Driven Architecture is Event Sourcing, which involves persisting the state changes as events rather than storing just the current state. This is typically combined with CQRS (Command Query Responsibility Segregation), where you separate the handling of commands (state changes) and queries (reads).

 

Summary of Other Event-Driven Architectures

  1. Google Cloud Pub/Sub:
    • Managed service for event-driven communication with auto-scaling.
    • Asynchronous communication, decouples services.
  2. AWS SQS:
    • Fully managed queue service.
    • Highly available, scalable, and reliable for event-driven communication.
  3. Apache Pulsar:
    • Distributed messaging platform.
    • Supports both pub/sub and queue-based models.
    • Highly scalable, multi-tenant, geo-replicated.
  4. Event Sourcing (CQRS):
    • Persist events that change the state, making the system able to replay or synchronize.
These methods and tools are commonly used in large-scale distributed systems to facilitate event-driven communication and decoupled services.


                                            Want to donate ? Pay → https://razorpay.me/@niteshsynergy  

Thanks for reading mny blog…
If you find any issue or sugestions let me know…. As I'm parallel refactoring each concpets asap

 
46 min read
Dec 12, 2024
By Nitesh Synergy
Share