I'm always excited to take on new projects and collaborate with innovative minds.

Email

contact@niteshsynergy.com

Website

https://www.niteshsynergy.com/

Kafka-RabitMQ-IBMMQ-Event-Driven-Architecture

RabbitMQ, Kafka, and IBM MQ are three popular messaging systems, but they differ in terms of their design, use cases, and key features.

 

Apache Kafka

  • Type: Distributed Streaming Platform
  • Message Protocol: Kafka protocol
  • Message Delivery: Primarily designed for publish-subscribe with strong durability guarantees.
  • Use Case: Best suited for stream processing, event sourcing, and log aggregation. It handles high-throughput, real-time data feeds effectively, making it popular for data lakes and real-time analytics.
  • Strengths:
    • Extremely high throughput (millions of messages per second).
    • Horizontal scalability with built-in partitioning.
    • Strong durability and fault tolerance (message persistence to disk).
    • Ideal for event-driven architectures and log-based systems.
  • Scalability: Excellent horizontal scalability. Kafka is built to handle large volumes of data and scale horizontally with minimal impact on performance.
  • Fault Tolerance: Replication of data across multiple nodes ensures high availability.
  • Performance: Very high throughput and low latency for large-scale applications. Kafka handles high volumes of messages effectively, making it ideal for large-scale, real-time data pipelines.
  • Drawbacks:
    • More complex to set up and manage.
    • Not as flexible as RabbitMQ for traditional messaging patterns like complex routing or request-response.
Producer Configuration:

Kafka producers send messages to a Kafka topic.
@Configuration
public class KafkaProducerConfig {
   @Bean
   public ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(producerConfigs());
   }

   @Bean
   public KafkaTemplate<String, String> kafkaTemplate() {
       return new KafkaTemplate<>(producerFactory());
   }

   private Map<String, Object> producerConfigs() {
       Map<String, Object> configs = new HashMap<>();
       configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return configs;
   }
}
 

Producer Code:

@Service
public class KafkaProducer {
   private final KafkaTemplate<String, String> kafkaTemplate;

   @Autowired
   public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
       this.kafkaTemplate = kafkaTemplate;
   }

   public void sendMessage(String message) {
       kafkaTemplate.send("topic_name", message);
   }
}
 

Consumer Configuration:

Kafka consumers receive messages from a Kafka topic.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfigs());
   }

   @Bean
   public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
       return new ConcurrentMessageListenerContainer<>(consumerFactory());
   }

   private Map<String, Object> consumerConfigs() {
       Map<String, Object> configs = new HashMap<>();
       configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
       configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       return configs;
   }
}
 

Consumer Code:

 

@Service
public class KafkaConsumer {
   @KafkaListener(topics = "topic_name", groupId = "group_id")
   public void listen(String message) {
       System.out.println("Received Message: " + message);
   }
}
 

Kafka Use Case: Real-Time Event Processing

Use Case: A real-time event streaming system where events (e.g., user activity logs, sensor data, etc.) are produced and consumed by different services for processing and analysis.

Concepts:

  • Producer: Sends messages to Kafka topics.
  • Consumer: Reads messages from Kafka topics.
  • Topic: A category to which messages are published.
  • Partition: Kafka topics are divided into partitions for scalability.
  • Consumer Group: Multiple consumers can join a group to share the load of reading messages from a topic.

Rules & Points:

  • Kafka is designed for high throughput and fault tolerance.
  • Supports distributed, horizontal scaling through topic partitioning.
  • Guarantees delivery through replication and partitioning.
  • Ideal for event-driven architectures and stream processing.

     

Spring Boot Project (Kafka)

  1. Add dependencies (pom.xml):

<dependencies>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

2. Configuration (KafkaConfig.java):

@Configuration
@EnableKafka
public class KafkaConfig {

   @Bean
   public ProducerFactory<String, String> producerFactory() {
       return new DefaultKafkaProducerFactory<>(producerConfig());
   }

   @Bean
   public KafkaTemplate<String, String> kafkaTemplate() {
       return new KafkaTemplate<>(producerFactory());
   }

   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfig());
   }

   @Bean
   public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
       return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
   }
}
 

3. Producer (REST Controller) (EventController.java):

@RestController
@RequestMapping("/event")
public class EventController {

   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;

   @PostMapping("/send")
   public ResponseEntity<String> sendEvent(@RequestBody String eventMessage) {
       kafkaTemplate.send("event_topic", eventMessage);
       return ResponseEntity.ok("Event sent to Kafka");
   }
}
 

4.Consumer (EventListener.java):

@Component
public class EventListener {

   @KafkaListener(topics = "event_topic", groupId = "event_group")
   public void listenEvent(String eventMessage) {
       System.out.println("Received Event: " + eventMessage);
       // Process event message asynchronously
   }
}
 

5. Event Model (Event.java):

public class Event {
   private String message;
   private Date timestamp;

   // Getters and Setters
}
 

 Kafka Configuration in application.properties

For Kafka, you'll need to specify the Kafka broker, topics, consumer group, and other configurations.

# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=event_group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# Additional configurations
spring.kafka.listener.concurrency=3 # Number of consumer threads
spring.kafka.listener.ack-mode=manual # Manual acknowledgment for fine-grained control
spring.kafka.consumer.max-poll-records=10 # Max records pulled per poll
 

Explanation:

  • spring.kafka.bootstrap-servers: Comma-separated list of Kafka brokers to connect to.
  • spring.kafka.consumer.group-id: Kafka consumer group ID.
  • spring.kafka.consumer.auto-offset-reset: How Kafka should handle the offset when consuming a topic. earliest means starting from the earliest available message.
  • spring.kafka.consumer.key-deserializer / value-deserializer: Specifies the deserializers for the consumer to convert bytes into Java objects.
  • spring.kafka.producer.key-serializer / value-serializer: Specifies the serializers for the producer.
  • spring.kafka.listener.concurrency: Number of consumers that will handle partitions.
  • spring.kafka.listener.ack-mode: Acknowledgment mode for Kafka message consumption.

 

 

2. RabbitMQ Messaging with Spring Boot

Overview:

RabbitMQ is an open-source message broker that supports multiple messaging protocols. It is widely used in enterprise systems for message queuing.

Use Case:

RabbitMQ is used for asynchronous processing, task queues, and decoupling microservices.

 

RabbitMQ

  • Type: Message Broker (AMQP-based)
  • Message Protocol: Advanced Message Queuing Protocol (AMQP)
  • Message Delivery: Supports both point-to-point and publish/subscribe patterns.
  • Use Case: Best for applications requiring complex routing, low-latency delivery, and easy integration. Ideal for scenarios like microservices, task queues, and backend communication.
  • Strengths:
    • High reliability and flexible message routing.
    • Supports complex messaging patterns (e.g., routing, topics).
    • Easy to set up and manage.
    • Strong community and documentation.
  • Scalability: Can scale horizontally using clustering, though it may face challenges with very high throughput.
  • Fault Tolerance: Provides message persistence, replication, and clustering for high availability.
  • Performance: Suitable for workloads with moderate message rates. May not scale as well as Kafka for very high-throughput systems.

 

Spring Boot Integration:

First, add the dependencies for RabbitMQ in your pom.xml:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
 

 

Producer Configuration:

@Configuration
public class RabbitConfig {
   @Bean
   public Queue queue() {
       return new Queue("queue_name", false);
   }

   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("exchange_name");
   }

   @Bean
   public Binding binding() {
       return BindingBuilder.bind(queue()).to(exchange()).with("routing_key");
   }

   @Bean
   public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
       return new RabbitTemplate(connectionFactory);
   }
}
 

 

Producer Code:

 

@Service
public class RabbitProducer {
   private final AmqpTemplate amqpTemplate;

   @Autowired
   public RabbitProducer(AmqpTemplate amqpTemplate) {
       this.amqpTemplate = amqpTemplate;
   }

   public void sendMessage(String message) {
       amqpTemplate.convertAndSend("exchange_name", "routing_key", message);
   }
}
 

 

Consumer Code:

 

@Service
public class RabbitConsumer {
   @RabbitListener(queues = "queue_name")
   public void listen(String message) {
       System.out.println("Received Message: " + message);
   }
}
 

 

RabbitMQ Use Case: Task Queue System

Use Case: A background task processing system where tasks are queued and processed by worker services. RabbitMQ is used to distribute the tasks among different worker services asynchronously.

Concepts:

  • Queues: A message queue holds messages that are consumed by workers.
  • Exchanges: Routes messages to queues.
  • Bindings: A way to connect queues to exchanges.
  • Consumers: Workers that read and process messages from queues.
  • Producers: Components that send messages to RabbitMQ.

Rules & Points:

  • RabbitMQ uses the AMQP protocol.
  • Messages are guaranteed delivery (if configured for persistence).
  • It supports various exchange types like direct, topic, fanout, and headers-based exchanges.
  • It provides message acknowledgment and retries.

Spring Boot Project (RabbitMQ)

  1. Add dependencies (pom.xml):

 

<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

 

2. Configuration (RabbitMQConfig.java):

@Configuration
public class RabbitMQConfig {

   @Bean
   public Queue queue() {
       return new Queue("task_queue", true);
   }

   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("task_exchange");
   }

   @Bean
   public Binding binding(Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("task.#");
   }

   @Bean
   public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) {
       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       container.setConnectionFactory(connectionFactory);
       container.setMessageListener(messageListener);
       container.setQueueNames("task_queue");
       return container;
   }
}
 

3. Producer (REST Controller) (TaskController.java):

@RestController
@RequestMapping("/task")
public class TaskController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @PostMapping("/send")
   public ResponseEntity<String> sendTask(@RequestBody Task task) {
       rabbitTemplate.convertAndSend("task_exchange", "task.new", task);
       return ResponseEntity.ok("Task sent to the queue");
   }
}
 

4. Consumer (TaskListener.java):

@Component
public class TaskListener {

   @RabbitListener(queues = "task_queue")
   public void receiveMessage(Task task) {
       System.out.println("Received Task: " + task);
       // Process task asynchronously
   }
}
 

5. Task Model (Task.java):

public class Task {
   private String name;
   private String description;

   // Getters and Setters
}
 

RabbitMQ Configuration in application.properties

For RabbitMQ, you need to configure the connection to the RabbitMQ server and set up certain properties for message queues.

 

# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=5 # Number of consumer threads
spring.rabbitmq.listener.simple.max-concurrency=10 # Max number of consumer threads
spring.rabbitmq.listener.simple.prefetch=10 # Prefetch count (messages pulled at once)

# Optional configurations for RabbitMQ exchange and queues
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=false
 

Explanation:

  • spring.rabbitmq.host: The host where RabbitMQ server is running.
  • spring.rabbitmq.port: The port (default is 5672).
  • spring.rabbitmq.username / password: Credentials for connecting to RabbitMQ (default is guest/guest).
  • spring.rabbitmq.listener.simple.concurrency: Number of consumers to start in parallel.
  • spring.rabbitmq.listener.simple.max-concurrency: Max number of consumers.
  • spring.rabbitmq.listener.simple.prefetch: Controls how many messages the consumer can fetch at once.

 

3. IBM MQ Messaging with Spring Boot

Overview:

IBM MQ is an enterprise messaging system known for its reliability and high performance. It provides point-to-point and publish-subscribe messaging models.

Use Case:

IBM MQ is often used in financial services, healthcare, and large enterprises for secure, reliable messaging.

 

IBM MQ

  • Type: Enterprise Message Broker
  • Message Protocol: Proprietary IBM MQ protocol, with support for JMS and MQTT.
  • Message Delivery: Supports point-to-point and publish/subscribe patterns, along with transactional support.
  • Use Case: IBM MQ is often used in large enterprises with complex, high-security environments and mission-critical applications. It's a good fit for financial services, government, and healthcare industries.
  • Strengths:
    • High level of reliability, security, and enterprise-grade features.
    • Supports both synchronous and asynchronous messaging, including transactional messaging.
    • Robust security features and compliance with regulatory standards.
    • Integration with other IBM products (e.g., IBM Integration Bus).
  • Scalability: Can scale vertically and horizontally, but typically needs to be managed closely for large deployments.
  • Fault Tolerance: Advanced features like message persistence, high availability configurations, and automatic recovery.
  • Performance: Designed for low-latency, high-reliability messaging, though its throughput is typically lower than Kafka's at very high volumes.
  • Drawbacks:
    • Licensing and pricing can be expensive, particularly in large-scale environments.
    • More complex setup and higher operational overhead than RabbitMQ.

 

Spring Boot Integration:

First, include the necessary dependency:

<dependency>
   <groupId>com.ibm.mq</groupId>
   <artifactId>com.ibm.mq.allclient</artifactId>
   <version>9.3.0.0</version>
</dependency>
 

Producer Configuration:

@Configuration
public class IBMQueueConfig {
   @Bean
   public MQQueueConnectionFactory connectionFactory() throws JMSException {
       MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
       factory.setHostName("localhost");
       factory.setPort(1414);
       factory.setQueueManager("QM1");
       factory.setChannel("DEV.APP.SVRCONN");
       return factory;
   }

   @Bean
   public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
       return new JmsTemplate(connectionFactory);
   }
}
 

Producer Code:

@Service
public class IBMMQProducer {
   private final JmsTemplate jmsTemplate;

   @Autowired
   public IBMMQProducer(JmsTemplate jmsTemplate) {
       this.jmsTemplate = jmsTemplate;
   }

   public void sendMessage(String message) {
       jmsTemplate.convertAndSend("queue_name", message);
   }
}
 

Consumer Code:

@Service
public class IBMMQConsumer {
   @JmsListener(destination = "queue_name")
   public void listen(String message) {
       System.out.println("Received Message: " + message);
   }
}
 

 

3. IBM MQ Configuration in application.properties

For IBM MQ, you need to provide connection details for the MQ server, along with security and queue configurations.

# IBM MQ Configuration
ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password

# Connection factory and other settings
spring.jms.pub-sub-domain=false # Set to true for topic-based messaging
spring.jms.listener.concurrency=5 # Number of listeners
 

Explanation:

  • ibm.mq.queueManager: The name of the IBM MQ Queue Manager.
  • ibm.mq.channel: The MQ channel for connecting to the Queue Manager.
  • ibm.mq.host: The host where IBM MQ is running (usually localhost).
  • ibm.mq.port: The port on which the MQ server listens (default is 1414).
  • ibm.mq.queueName: The name of the queue to which messages are sent or from which they are consumed.
  • ibm.mq.sslEnabled: Whether SSL encryption should be used for MQ connections.
  • ibm.mq.username / ibm.mq.password: Credentials for connecting to IBM MQ.
  • spring.jms.pub-sub-domain: Set to true if using topics (for pub/sub) and false for queues (for point-to-point).
  • spring.jms.listener.concurrency: Number of concurrent message listeners.

 

General Notes on Configurations:

  • Security and Authentication: For all systems (RabbitMQ, Kafka, and IBM MQ), it’s important to configure secure connections (e.g., enabling SSL/TLS, configuring username/password, etc.) depending on the environment.
  • Environment-Specific Configurations: It’s often a good practice to separate configuration settings for development, testing, and production environments. You can use profiles such as application-dev.properties, application-prod.properties, etc., in Spring Boot.
  • Error Handling and Retries: Consider adding retry mechanisms and dead-letter queues for failed message deliveries, especially in production environments.

 

 

Comparison Summary:

FeatureRabbitMQKafkaIBM MQ
ProtocolAMQP, MQTT, STOMPKafka ProtocolIBM MQ Protocol, JMS, MQTT
Best forGeneral messaging, task queuesEvent streams, log aggregationEnterprise messaging, financial systems
Use CasesMicroservices, task queuesReal-time data pipelines, event-driven systemsEnterprise integration, secure and reliable messaging
ScalabilityModerate, clustered setupHigh scalability, partitionedModerate to high, complex setup
PerformanceModerate throughputHigh throughput, low latencyModerate, high reliability
ReliabilityGood, supports message persistenceHigh, built for fault toleranceVery high, designed for enterprise reliability
Ease of UseEasy to set up and manageComplex setup, but powerfulHigh operational overhead, enterprise-focused
Fault ToleranceGood, supports clustering and replicationExcellent, replication and partitioningAdvanced, high availability and persistence
CostOpen-source (with enterprise support options)Open-source (with Confluent support)Commercial, often costly for large-scale deployments

When to Choose Each:

  • RabbitMQ: Choose RabbitMQ for simpler messaging use cases with moderate throughput, especially when you need flexible message routing or integration with other services.
  • Kafka: Choose Kafka when you need to handle large volumes of real-time data, support high throughput, and focus on stream processing or event-driven architecture.
  • IBM MQ: Choose IBM MQ for large, enterprise-grade systems that require high levels of security, transactional messaging, and integration with other IBM products in industries like banking and healthcare.
 

 

4. Event-Driven Architecture in Spring Boot

Overview:

Event-driven architecture (EDA) involves components that communicate by sending and receiving events, enabling decoupled systems. In Spring Boot, this can be achieved using messaging systems like Kafka, RabbitMQ, or JMS.

Use Case:

Event-driven systems are perfect for microservices architectures, enabling asynchronous communication, real-time processing, and decoupling of services.

Example with Kafka:

Event Publisher:

public class OrderCreatedEvent {
   private String orderId;
   private String customerName;

   // Getters and setters
}

@Service
public class OrderService {
   private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

   @Autowired
   public OrderService(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
       this.kafkaTemplate = kafkaTemplate;
   }

   public void createOrder(OrderCreatedEvent orderEvent) {
       kafkaTemplate.send("order_topic", orderEvent);
   }
}
 

 

Event Consumer:

@Service
public class OrderEventListener {
   @KafkaListener(topics = "order_topic", groupId = "order_group")
   public void handleOrderCreatedEvent(OrderCreatedEvent event) {
       System.out.println("Order created: " + event.getOrderId());
   }
}
 

 

Why Event-Driven Architecture (EDA) is Needed in Microservices?

Event-Driven Architecture (EDA) is an architectural pattern that promotes the decoupling of microservices through asynchronous communication, where services communicate by producing and consuming events rather than making direct synchronous calls to each other. This pattern offers several advantages in microservices systems:

  1. Loose Coupling: Each microservice is independent and doesn't need to know about the internal workings of other services. This increases modularity and allows for easier scaling, maintenance, and independent deployment.
  2. Asynchronous Communication: Services interact asynchronously through events, which allows for non-blocking communication. This reduces the dependency on synchronous API calls that can cause delays or timeouts.
  3. Scalability: EDA allows for better scalability, as services can scale independently. The event bus (message broker) decouples services and allows them to handle different loads.
  4. Resilience: EDA improves system resilience, as it can handle failures gracefully. If a service goes down, messages can be stored and retried when the service comes back online, ensuring eventual consistency.
  5. Flexibility: Events can be processed by multiple consumers, enabling new services to join the system and react to events without changes to the existing services.

Example Implementation with Three Messaging Systems (RabbitMQ, Kafka, IBM MQ)

We'll implement a simple Order Service that sends an OrderPlaced event whenever an order is placed. The Inventory Service and Payment Service will listen to these events to perform related tasks (e.g., updating inventory and processing payment).

The key concepts are:

  • Producer: Sends events to the message broker.
  • Consumer: Listens to events from the message broker and reacts accordingly.
  • Event: Represents the action that occurred (e.g., OrderPlaced).

 

1. Event-Driven Architecture with RabbitMQ

Project Setup:

  1. Dependencies: Add the necessary RabbitMQ and Spring Boot dependencies to pom.xml.

<dependencies>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

Event Model (OrderPlacedEvent.java):

public class OrderPlacedEvent {
   private String orderId;
   private String customerName;
   private double totalAmount;

   // Getters and Setters
}
 

RabbitMQ Configuration (RabbitMQConfig.java):

@Configuration
public class RabbitMQConfig {

   @Bean
   public Queue orderQueue() {
       return new Queue("orderQueue", true);
   }

   @Bean
   public DirectExchange exchange() {
       return new DirectExchange("orderExchange");
   }

   @Bean
   public Binding binding(Queue orderQueue, DirectExchange exchange) {
       return BindingBuilder.bind(orderQueue).to(exchange).with("order.key");
   }
}
 

 

Order Producer (Controller) (OrderController.java):

 

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       rabbitTemplate.convertAndSend("orderExchange", "order.key", event);
       return ResponseEntity.ok("Order placed and event sent to RabbitMQ");
   }
}


Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @RabbitListener(queues = "orderQueue")
   public void handleOrderPlacedEvent(OrderPlacedEvent event) {
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order (e.g., update inventory, process payment)
   }
}
 

application.properties:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
 

2. Event-Driven Architecture with Kafka

Project Setup:

  1. Dependencies: Add Kafka dependencies to pom.xml.

<dependencies>
   <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

 

Kafka Configuration (KafkaConfig.java):

 

@Configuration
@EnableKafka
public class KafkaConfig {

   @Bean
   public KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate() {
       return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
   }

   @Bean
   public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
       return new DefaultKafkaConsumerFactory<>(consumerConfig());
   }

   @Bean
   public ConcurrentMessageListenerContainer<String, OrderPlacedEvent> listenerContainer() {
       return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
   }
}
 

Order Producer (Controller) (OrderController.java):

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       kafkaTemplate.send("orderTopic", event);
       return ResponseEntity.ok("Order placed and event sent to Kafka");
   }
}
 

 

Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @KafkaListener(topics = "orderTopic", groupId = "orderGroup")
   public void handleOrderPlacedEvent(OrderPlacedEvent event) {
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order (e.g., update inventory, process payment)
   }
}
 

 

application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orderGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 

3. Event-Driven Architecture with IBM MQ

Project Setup:

  1. Dependencies: Add the IBM MQ dependency in pom.xml.

 

<dependencies>
   <dependency>
       <groupId>com.ibm.mq</groupId>
       <artifactId>mq-jms-spring-boot-starter</artifactId>
       <version>3.0.0</version>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
</dependencies>
 

 

IBM MQ Configuration (IBM_MQConfig.java):

 

@Configuration
public class IBM_MQConfig {

   @Value("${ibm.mq.queueManager}")
   private String queueManager;

   @Value("${ibm.mq.channel}")
   private String channel;

   @Value("${ibm.mq.host}")
   private String host;

   @Value("${ibm.mq.port}")
   private int port;

   @Value("${ibm.mq.queueName}")
   private String queueName;

   @Bean
   public ConnectionFactory connectionFactory() {
       MQQueueManagerFactory factory = new MQQueueManagerFactory();
       factory.setQueueManager(queueManager);
       factory.setChannel(channel);
       factory.setHost(host);
       factory.setPort(port);
       return factory.create();
   }

   @Bean
   public Queue queue() {
       return new MQQueue(queueName);
   }
}
 

 

Order Producer (Controller) (OrderController.java):

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private JmsTemplate jmsTemplate;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       jmsTemplate.convertAndSend("TransactionQueue", event);
       return ResponseEntity.ok("Order placed and event sent to IBM MQ");
   }
}
 

 

Order Consumer (OrderListener.java):

 

 

@Component
public class OrderListener {

   @JmsListener(destination = "TransactionQueue")
   public void handleOrderPlacedEvent(OrderPlacedEvent event) {
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order (e.g., update inventory, process payment)
   }
}
 

application.properties:

ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password
 

Each messaging system (RabbitMQ, Kafka, IBM MQ) enables Event-Driven Architecture and allows microservices to communicate asynchronously through events. The implementation process involves:

  1. Defining events that are passed between microservices.
  2. Producers that publish events to the messaging system.
  3. Consumers that listen for and react to those events.

 

There are several other ways to implement Event-Driven Architecture (EDA) in microservices. The most common approaches include:

  1. Using a Publish-Subscribe Model (Pub/Sub)
  2. Using Cloud-based Messaging Services (e.g., AWS SQS, Google Pub/Sub)
  3. Using Apache Pulsar
  4. Using Database Change Streams or Event Sourcing

 

1. Publish-Subscribe Model (Pub/Sub) with a Message Broker

In this architecture, you define a topic or channel that producers send messages to, and consumers can subscribe to the topic. This is common in RabbitMQ (with exchanges and queues), Kafka, and even Google Cloud Pub/Sub.

Example with Google Cloud Pub/Sub (A Cloud-based Message Broker)

Project Setup:

  1. Dependencies: Add the necessary Google Pub/Sub dependencies in pom.xml.

<dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-pubsub</artifactId>
   <version>2.12.1</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

Pub/Sub Configuration (PubSubConfig.java):

@Configuration
public class PubSubConfig {

   @Value("${google.cloud.project-id}")
   private String projectId;

   @Bean
   public Publisher publisher() throws Exception {
       return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
   }

   @Bean
   public Subscriber subscriber() {
       return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
           System.out.println("Received message: " + message.getData().toStringUtf8());
           consumer.ack();
       }).build();
   }
}
 

@Configuration
public class PubSubConfig {

   @Value("${google.cloud.project-id}")
   private String projectId;

   @Bean
   public Publisher publisher() throws Exception {
       return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
   }

   @Bean
   public Subscriber subscriber() {
       return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
           System.out.println("Received message: " + message.getData().toStringUtf8());
           consumer.ack();
       }).build();
   }
}
 

Order Producer (Controller) (OrderController.java):

 

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private Publisher publisher;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws Exception {
       String message = new ObjectMapper().writeValueAsString(event);
       publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build());
       return ResponseEntity.ok("Order placed and event sent to Google Pub/Sub");
   }
}
 

Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @Autowired
   private Subscriber subscriber;

   @PostConstruct
   public void init() {
       subscriber.startAsync();
   }
}
 

 

application.properties:

google.cloud.project-id=your-google-cloud-project-id
 

 

2. Using Cloud-based Messaging Services (e.g., AWS SQS)

Amazon Web Services (AWS) provides Simple Queue Service (SQS) for message queuing, which is fully managed and highly scalable. This can be used in a similar way as RabbitMQ, Kafka, or Pub/Sub for event-driven communication.

AWS SQS Example

Project Setup:

  1. Dependencies: Add the necessary dependencies to pom.xml.

 

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-aws-messaging</artifactId>
   <version>3.0.0</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

 

SQS Configuration (SQSConfig.java):

@Configuration
public class SQSConfig {

   @Bean
   public Queue queue() {
       return new Queue("orderQueue", true);
   }

   @Bean
   public AmazonSQSAsync amazonSQS() {
       return AmazonSQSAsyncClient.builder().build();
   }

   @Bean
   public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQS, Queue queue) {
       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       container.setAmazonSqs(amazonSQS);
       container.setQueueNames(queue.getName());
       container.setMessageHandler(new MessageHandler());
       return container;
   }
}
 

Order Producer (Controller) (OrderController.java):

 

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private AmazonSQSAsync amazonSQS;

   @Value("${aws.sqs.queue-url}")
   private String queueUrl;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
       String message = new ObjectMapper().writeValueAsString(event);
       SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, message);
       amazonSQS.sendMessage(sendMessageRequest);
       return ResponseEntity.ok("Order placed and event sent to SQS");
   }
}
 

Order Consumer (OrderListener.java):

 

@Component
public class OrderListener {

   @SqsListener(value = "orderQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
   public void handleOrderPlacedEvent(String message) {
       OrderPlacedEvent event = new ObjectMapper().readValue(message, OrderPlacedEvent.class);
       System.out.println("Order placed: " + event.getOrderId());
       // Process the order
   }
}
 

 

application.properties:

 

aws.sqs.queue-url=https://sqs.us-east-1.amazonaws.com/your-account-id/your-queue-name
 

 

3. Apache Pulsar for Event-Driven Microservices

Apache Pulsar is an open-source, distributed messaging and streaming platform, similar to Kafka but with more focus on multi-tenancy, geo-replication, and long-term storage. Pulsar supports both publish-subscribe and queue-based messaging models.

Pulsar Example

  1. Dependencies: Add Pulsar dependencies in pom.xml.

 

<dependency>
   <groupId>org.apache.pulsar</groupId>
   <artifactId>pulsar-client-api</artifactId>
   <version>2.11.1</version>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
 

 

Pulsar Configuration (PulsarConfig.java):

 

@Configuration
public class PulsarConfig {

   @Value("${pulsar.service-url}")
   private String pulsarServiceUrl;

   @Bean
   public PulsarClient pulsarClient() throws PulsarClientException {
       return PulsarClient.builder().serviceUrl(pulsarServiceUrl).build();
   }

   @Bean
   public Consumer<byte[]> orderConsumer(PulsarClient pulsarClient) throws PulsarClientException {
       return pulsarClient.newConsumer()
                          .topic("order-topic")
                          .subscriptionName("order-subscription")
                          .subscribe();
   }
}
 

 

Order Producer (OrderController.java):

@RestController
@RequestMapping("/orders")
public class OrderController {

   @Autowired
   private PulsarClient pulsarClient;

   @PostMapping("/place")
   public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws PulsarClientException {
       Producer<byte[]> producer = pulsarClient.newProducer()
                                              .topic("order-topic")
                                              .create();
       String message = new ObjectMapper().writeValueAsString(event);
       producer.send(message.getBytes());
       return ResponseEntity.ok("Order placed and event sent to Pulsar");
   }
}
 

Order Consumer (OrderListener.java):

@Component
public class OrderListener {

   @Autowired
   private Consumer<byte[]> orderConsumer;

   @PostConstruct
   public void consume() throws PulsarClientException {
       while (true) {
           Message<byte[]> msg = orderConsumer.receive();
           String messageContent = new String(msg.getData());
           OrderPlacedEvent event = new ObjectMapper().readValue(messageContent, OrderPlacedEvent.class);
           System.out.println("Order placed: " + event.getOrderId());
           // Process the order
       }
   }
}
 

 

application.properties:

pulsar.service-url=pulsar://localhost:6650
 

 

4. Using Event Sourcing (CQRS)

Another approach in Event-Driven Architecture is Event Sourcing, which involves persisting the state changes as events rather than storing just the current state. This is typically combined with CQRS (Command Query Responsibility Segregation), where you separate the handling of commands (state changes) and queries (reads).

 

Summary of Other Event-Driven Architectures

  1. Google Cloud Pub/Sub:
    • Managed service for event-driven communication with auto-scaling.
    • Asynchronous communication, decouples services.
  2. AWS SQS:
    • Fully managed queue service.
    • Highly available, scalable, and reliable for event-driven communication.
  3. Apache Pulsar:
    • Distributed messaging platform.
    • Supports both pub/sub and queue-based models.
    • Highly scalable, multi-tenant, geo-replicated.
  4. Event Sourcing (CQRS):
    • Persist events that change the state, making the system able to replay or synchronize.
These methods and tools are commonly used in large-scale distributed systems to facilitate event-driven communication and decoupled services.


                                            Want to donate ? Pay → https://razorpay.me/@niteshsynergy

Thanks for reading mny blog…
If you find any issue or sugestions let me know…. As I'm parallel refactoring each concpets asap

 
30 min read
Dec 12, 2024
By Nitesh Synergy
Share