I'm always excited to take on new projects and collaborate with innovative minds.
contact@niteshsynergy.com
https://www.niteshsynergy.com/
RabbitMQ, Kafka, and IBM MQ are three popular messaging systems, but they differ in terms of their design, use cases, and key features.
Kafka producers send messages to a Kafka topic.
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return configs;
}
}
Producer Code:
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send("topic_name", message);
}
}
Kafka consumers receive messages from a Kafka topic.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
return new ConcurrentMessageListenerContainer<>(consumerFactory());
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
}
Consumer Code:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "topic_name", groupId = "group_id")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
Use Case: A real-time event streaming system where events (e.g., user activity logs, sensor data, etc.) are produced and consumed by different services for processing and analysis.
Concepts:
Rules & Points:
Spring Boot Project (Kafka)
pom.xml
):<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. Configuration (KafkaConfig.java
):
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageListenerContainer() {
return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
}
}
3. Producer (REST Controller) (EventController.java
):
@RestController
@RequestMapping("/event")
public class EventController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/send")
public ResponseEntity<String> sendEvent(@RequestBody String eventMessage) {
kafkaTemplate.send("event_topic", eventMessage);
return ResponseEntity.ok("Event sent to Kafka");
}
}
4.Consumer (EventListener.java
):
@Component
public class EventListener {
@KafkaListener(topics = "event_topic", groupId = "event_group")
public void listenEvent(String eventMessage) {
System.out.println("Received Event: " + eventMessage);
// Process event message asynchronously
}
}
5. Event Model (Event.java
):
public class Event {
private String message;
private Date timestamp;
// Getters and Setters
}
application.properties
For Kafka, you'll need to specify the Kafka broker, topics, consumer group, and other configurations.
# Kafka Configuration
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=event_group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# Additional configurations
spring.kafka.listener.concurrency=3 # Number of consumer threads
spring.kafka.listener.ack-mode=manual # Manual acknowledgment for fine-grained control
spring.kafka.consumer.max-poll-records=10 # Max records pulled per poll
spring.kafka.bootstrap-servers
: Comma-separated list of Kafka brokers to connect to.spring.kafka.consumer.group-id
: Kafka consumer group ID.spring.kafka.consumer.auto-offset-reset
: How Kafka should handle the offset when consuming a topic. earliest
means starting from the earliest available message.spring.kafka.consumer.key-deserializer
/ value-deserializer
: Specifies the deserializers for the consumer to convert bytes into Java objects.spring.kafka.producer.key-serializer
/ value-serializer
: Specifies the serializers for the producer.spring.kafka.listener.concurrency
: Number of consumers that will handle partitions.spring.kafka.listener.ack-mode
: Acknowledgment mode for Kafka message consumption.
RabbitMQ is an open-source message broker that supports multiple messaging protocols. It is widely used in enterprise systems for message queuing.
RabbitMQ is used for asynchronous processing, task queues, and decoupling microservices.
First, add the dependencies for RabbitMQ in your pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Producer Configuration:
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue_name", false);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange_name");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("routing_key");
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
Producer Code:
@Service
public class RabbitProducer {
private final AmqpTemplate amqpTemplate;
@Autowired
public RabbitProducer(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void sendMessage(String message) {
amqpTemplate.convertAndSend("exchange_name", "routing_key", message);
}
}
Consumer Code:
@Service
public class RabbitConsumer {
@RabbitListener(queues = "queue_name")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
Use Case: A background task processing system where tasks are queued and processed by worker services. RabbitMQ is used to distribute the tasks among different worker services asynchronously.
Concepts:
Rules & Points:
Spring Boot Project (RabbitMQ)
pom.xml
):
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
2. Configuration (RabbitMQConfig.java
):
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue() {
return new Queue("task_queue", true);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("task_exchange");
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("task.#");
}
@Bean
public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setMessageListener(messageListener);
container.setQueueNames("task_queue");
return container;
}
}
3. Producer (REST Controller) (TaskController.java
):
@RestController
@RequestMapping("/task")
public class TaskController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public ResponseEntity<String> sendTask(@RequestBody Task task) {
rabbitTemplate.convertAndSend("task_exchange", "task.new", task);
return ResponseEntity.ok("Task sent to the queue");
}
}
4. Consumer (TaskListener.java
):
@Component
public class TaskListener {
@RabbitListener(queues = "task_queue")
public void receiveMessage(Task task) {
System.out.println("Received Task: " + task);
// Process task asynchronously
}
}
5. Task Model (Task.java
):
public class Task {
private String name;
private String description;
// Getters and Setters
}
application.properties
For RabbitMQ, you need to configure the connection to the RabbitMQ server and set up certain properties for message queues.
# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.listener.simple.concurrency=5 # Number of consumer threads
spring.rabbitmq.listener.simple.max-concurrency=10 # Max number of consumer threads
spring.rabbitmq.listener.simple.prefetch=10 # Prefetch count (messages pulled at once)
# Optional configurations for RabbitMQ exchange and queues
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.host
: The host where RabbitMQ server is running.spring.rabbitmq.port
: The port (default is 5672).spring.rabbitmq.username
/ password
: Credentials for connecting to RabbitMQ (default is guest/guest).spring.rabbitmq.listener.simple.concurrency
: Number of consumers to start in parallel.spring.rabbitmq.listener.simple.max-concurrency
: Max number of consumers.spring.rabbitmq.listener.simple.prefetch
: Controls how many messages the consumer can fetch at once.
IBM MQ is an enterprise messaging system known for its reliability and high performance. It provides point-to-point and publish-subscribe messaging models.
IBM MQ is often used in financial services, healthcare, and large enterprises for secure, reliable messaging.
First, include the necessary dependency:
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.3.0.0</version>
</dependency>
Producer Configuration:
@Configuration
public class IBMQueueConfig {
@Bean
public MQQueueConnectionFactory connectionFactory() throws JMSException {
MQQueueConnectionFactory factory = new MQQueueConnectionFactory();
factory.setHostName("localhost");
factory.setPort(1414);
factory.setQueueManager("QM1");
factory.setChannel("DEV.APP.SVRCONN");
return factory;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
return new JmsTemplate(connectionFactory);
}
}
Producer Code:
@Service
public class IBMMQProducer {
private final JmsTemplate jmsTemplate;
@Autowired
public IBMMQProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(String message) {
jmsTemplate.convertAndSend("queue_name", message);
}
}
Consumer Code:
@Service
public class IBMMQConsumer {
@JmsListener(destination = "queue_name")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
application.properties
For IBM MQ, you need to provide connection details for the MQ server, along with security and queue configurations.
# IBM MQ Configuration
ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password
# Connection factory and other settings
spring.jms.pub-sub-domain=false # Set to true for topic-based messaging
spring.jms.listener.concurrency=5 # Number of listeners
ibm.mq.queueManager
: The name of the IBM MQ Queue Manager.ibm.mq.channel
: The MQ channel for connecting to the Queue Manager.ibm.mq.host
: The host where IBM MQ is running (usually localhost
).ibm.mq.port
: The port on which the MQ server listens (default is 1414
).ibm.mq.queueName
: The name of the queue to which messages are sent or from which they are consumed.ibm.mq.sslEnabled
: Whether SSL encryption should be used for MQ connections.ibm.mq.username
/ ibm.mq.password
: Credentials for connecting to IBM MQ.spring.jms.pub-sub-domain
: Set to true
if using topics (for pub/sub) and false
for queues (for point-to-point).spring.jms.listener.concurrency
: Number of concurrent message listeners.
application-dev.properties
, application-prod.properties
, etc., in Spring Boot.
Event-driven architecture (EDA) involves components that communicate by sending and receiving events, enabling decoupled systems. In Spring Boot, this can be achieved using messaging systems like Kafka, RabbitMQ, or JMS.
Event-driven systems are perfect for microservices architectures, enabling asynchronous communication, real-time processing, and decoupling of services.
public class OrderCreatedEvent {
private String orderId;
private String customerName;
// Getters and setters
}
@Service
public class OrderService {
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
@Autowired
public OrderService(KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void createOrder(OrderCreatedEvent orderEvent) {
kafkaTemplate.send("order_topic", orderEvent);
}
}
Event Consumer:
@Service
public class OrderEventListener {
@KafkaListener(topics = "order_topic", groupId = "order_group")
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
System.out.println("Order created: " + event.getOrderId());
}
}
Event-Driven Architecture (EDA) is an architectural pattern that promotes the decoupling of microservices through asynchronous communication, where services communicate by producing and consuming events rather than making direct synchronous calls to each other. This pattern offers several advantages in microservices systems:
We'll implement a simple Order Service that sends an OrderPlaced event whenever an order is placed. The Inventory Service and Payment Service will listen to these events to perform related tasks (e.g., updating inventory and processing payment).
The key concepts are:
OrderPlaced
).
pom.xml
.<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
Event Model (OrderPlacedEvent.java
):
public class OrderPlacedEvent {
private String orderId;
private String customerName;
private double totalAmount;
// Getters and Setters
}
RabbitMQ Configuration (RabbitMQConfig.java
):
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
return new Queue("orderQueue", true);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange("orderExchange");
}
@Bean
public Binding binding(Queue orderQueue, DirectExchange exchange) {
return BindingBuilder.bind(orderQueue).to(exchange).with("order.key");
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
rabbitTemplate.convertAndSend("orderExchange", "order.key", event);
return ResponseEntity.ok("Order placed and event sent to RabbitMQ");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@RabbitListener(queues = "orderQueue")
public void handleOrderPlacedEvent(OrderPlacedEvent event) {
System.out.println("Order placed: " + event.getOrderId());
// Process the order (e.g., update inventory, process payment)
}
}
application.properties
:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
pom.xml
.<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
Kafka Configuration (KafkaConfig.java
):
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
}
@Bean
public ConsumerFactory<String, OrderPlacedEvent> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentMessageListenerContainer<String, OrderPlacedEvent> listenerContainer() {
return new ConcurrentMessageListenerContainer<>(consumerFactory(), containerConfig());
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
kafkaTemplate.send("orderTopic", event);
return ResponseEntity.ok("Order placed and event sent to Kafka");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@KafkaListener(topics = "orderTopic", groupId = "orderGroup")
public void handleOrderPlacedEvent(OrderPlacedEvent event) {
System.out.println("Order placed: " + event.getOrderId());
// Process the order (e.g., update inventory, process payment)
}
}
application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orderGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
pom.xml
.
<dependencies>
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
IBM MQ Configuration (IBM_MQConfig.java
):
@Configuration
public class IBM_MQConfig {
@Value("${ibm.mq.queueManager}")
private String queueManager;
@Value("${ibm.mq.channel}")
private String channel;
@Value("${ibm.mq.host}")
private String host;
@Value("${ibm.mq.port}")
private int port;
@Value("${ibm.mq.queueName}")
private String queueName;
@Bean
public ConnectionFactory connectionFactory() {
MQQueueManagerFactory factory = new MQQueueManagerFactory();
factory.setQueueManager(queueManager);
factory.setChannel(channel);
factory.setHost(host);
factory.setPort(port);
return factory.create();
}
@Bean
public Queue queue() {
return new MQQueue(queueName);
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private JmsTemplate jmsTemplate;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
jmsTemplate.convertAndSend("TransactionQueue", event);
return ResponseEntity.ok("Order placed and event sent to IBM MQ");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@JmsListener(destination = "TransactionQueue")
public void handleOrderPlacedEvent(OrderPlacedEvent event) {
System.out.println("Order placed: " + event.getOrderId());
// Process the order (e.g., update inventory, process payment)
}
}
application.properties
:
ibm.mq.queueManager=QM1
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.host=localhost
ibm.mq.port=1414
ibm.mq.queueName=TransactionQueue
ibm.mq.sslEnabled=false
ibm.mq.username=admin
ibm.mq.password=password
Each messaging system (RabbitMQ, Kafka, IBM MQ) enables Event-Driven Architecture and allows microservices to communicate asynchronously through events. The implementation process involves:
There are several other ways to implement Event-Driven Architecture (EDA) in microservices. The most common approaches include:
In this architecture, you define a topic or channel that producers send messages to, and consumers can subscribe to the topic. This is common in RabbitMQ (with exchanges and queues), Kafka, and even Google Cloud Pub/Sub.
pom.xml
.<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Pub/Sub Configuration (PubSubConfig.java
):
@Configuration
public class PubSubConfig {
@Value("${google.cloud.project-id}")
private String projectId;
@Bean
public Publisher publisher() throws Exception {
return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
}
@Bean
public Subscriber subscriber() {
return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}).build();
}
}
@Configuration
public class PubSubConfig {
@Value("${google.cloud.project-id}")
private String projectId;
@Bean
public Publisher publisher() throws Exception {
return Publisher.newBuilder("projects/" + projectId + "/topics/orderTopic").build();
}
@Bean
public Subscriber subscriber() {
return Subscriber.newBuilder("projects/" + projectId + "/subscriptions/orderSubscription", (message, consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}).build();
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private Publisher publisher;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws Exception {
String message = new ObjectMapper().writeValueAsString(event);
publisher.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build());
return ResponseEntity.ok("Order placed and event sent to Google Pub/Sub");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@Autowired
private Subscriber subscriber;
@PostConstruct
public void init() {
subscriber.startAsync();
}
}
application.properties
:
google.cloud.project-id=your-google-cloud-project-id
Amazon Web Services (AWS) provides Simple Queue Service (SQS) for message queuing, which is fully managed and highly scalable. This can be used in a similar way as RabbitMQ, Kafka, or Pub/Sub for event-driven communication.
pom.xml
.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws-messaging</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
SQS Configuration (SQSConfig.java
):
@Configuration
public class SQSConfig {
@Bean
public Queue queue() {
return new Queue("orderQueue", true);
}
@Bean
public AmazonSQSAsync amazonSQS() {
return AmazonSQSAsyncClient.builder().build();
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(AmazonSQSAsync amazonSQS, Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAmazonSqs(amazonSQS);
container.setQueueNames(queue.getName());
container.setMessageHandler(new MessageHandler());
return container;
}
}
Order Producer (Controller) (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private AmazonSQSAsync amazonSQS;
@Value("${aws.sqs.queue-url}")
private String queueUrl;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) {
String message = new ObjectMapper().writeValueAsString(event);
SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, message);
amazonSQS.sendMessage(sendMessageRequest);
return ResponseEntity.ok("Order placed and event sent to SQS");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@SqsListener(value = "orderQueue", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void handleOrderPlacedEvent(String message) {
OrderPlacedEvent event = new ObjectMapper().readValue(message, OrderPlacedEvent.class);
System.out.println("Order placed: " + event.getOrderId());
// Process the order
}
}
application.properties
:
aws.sqs.queue-url=https://sqs.us-east-1.amazonaws.com/your-account-id/your-queue-name
Apache Pulsar is an open-source, distributed messaging and streaming platform, similar to Kafka but with more focus on multi-tenancy, geo-replication, and long-term storage. Pulsar supports both publish-subscribe and queue-based messaging models.
pom.xml
.
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
Pulsar Configuration (PulsarConfig.java
):
@Configuration
public class PulsarConfig {
@Value("${pulsar.service-url}")
private String pulsarServiceUrl;
@Bean
public PulsarClient pulsarClient() throws PulsarClientException {
return PulsarClient.builder().serviceUrl(pulsarServiceUrl).build();
}
@Bean
public Consumer<byte[]> orderConsumer(PulsarClient pulsarClient) throws PulsarClientException {
return pulsarClient.newConsumer()
.topic("order-topic")
.subscriptionName("order-subscription")
.subscribe();
}
}
Order Producer (OrderController.java
):
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private PulsarClient pulsarClient;
@PostMapping("/place")
public ResponseEntity<String> placeOrder(@RequestBody OrderPlacedEvent event) throws PulsarClientException {
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("order-topic")
.create();
String message = new ObjectMapper().writeValueAsString(event);
producer.send(message.getBytes());
return ResponseEntity.ok("Order placed and event sent to Pulsar");
}
}
Order Consumer (OrderListener.java
):
@Component
public class OrderListener {
@Autowired
private Consumer<byte[]> orderConsumer;
@PostConstruct
public void consume() throws PulsarClientException {
while (true) {
Message<byte[]> msg = orderConsumer.receive();
String messageContent = new String(msg.getData());
OrderPlacedEvent event = new ObjectMapper().readValue(messageContent, OrderPlacedEvent.class);
System.out.println("Order placed: " + event.getOrderId());
// Process the order
}
}
}
application.properties
:
pulsar.service-url=pulsar://localhost:6650
Another approach in Event-Driven Architecture is Event Sourcing, which involves persisting the state changes as events rather than storing just the current state. This is typically combined with CQRS (Command Query Responsibility Segregation), where you separate the handling of commands (state changes) and queries (reads).