Spring Boot RabbitMQ Topic Exchange Example
In this tutorial, we’ll go through the step by step guide to implement messaging using RabbitMQ in a Spring Boot Application and will see how to publish and consume messages to Topic Exchange and queues using RabbitMQ.
When the producer publishes messages to RabbitMQ server, they are not published directly to a queue, instead first they goes to exchange. An exchange is in charge of routing messages to different queues using bindings and routing keys. A binding connects a queue and an exchange.
There are 5 types of exchanges
1.Direct Exchange
2.Topic Exchange
3.Fanout Exchange
4.Headers Exchange
5.Default Exchange
This is the second post in this series.
First part covers about Direct Exchange.
Third part covers about Fanout Exchange
Fourth part covers about Headers Exchange
In this blog post I will show you how to create and use Topic Exchange in Spring Boot applications.
Topic Exchange
Topic exchanges route messages to queues based on wildcard matches between the routing key and the routing pattern, which is specified by the queue binding. Messages are routed to one or many queues based on a matching between a message routing key and matching pattern.
The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.
Topic exchanges have a very broad set of use cases. Whenever a problem involves multiple consumers/applications that selectively choose which type of messages they want to receive, the use of topic exchanges should be considered.
The routing key must be a list of words, delimited by a period (.).
routing key can contain 2 special keys * or #
# – indicates a match of zero or more words
* – matches a word in a specific position of the routing key
Examples are report.monthly and report.*.weekly which in this case identifies reports that are set up for a bank.
The routing patterns may contain an asterisk (“*”) to match a word in a specific position of the routing key (e.g., a routing pattern of “report.*.*.weekly” only match routing keys where the first word is “report” and the fourth word is “weekly”).
A pound symbol (“#”) indicates a match of zero or more words (e.g., a routing pattern of “reports.monthly.#” matches any routing keys beginning with “reports.monthly”).
If producer sends a message with routing key “report.monthly” , it will match the routing key of
- report.monthly.#
- report.#
So message is routed to Queue A and Queue C
If producer sends a message with routing key “report.personalaccount.weekly” , it will match the routing key of
- report.*.weekly
- report.#
So message is routed to Queue B and Queue C
Setting Up Project
Go to Spring Initializr and add the following starter dependencies to a project.
- Spring Web
- Spring for RabbitMQ
Change the Name to “springboot-rabbitmq-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)
Import the project to your favorite IDE.
Configuring RabbitMQ connection
To integrate RabbitMQ in your Spring-powered web applications, first you need to do is configure RabbitMQ connection settings.
Spring for RabbitMQ provides a convenient class called RabbitTemplate to send and receive messages
By default RabbitTemplate class uses following configuration to connect to RabbitMQ instance.
host: localhost
port: 5672 (non -SSL) 5671 (SSL)
username: guest
password: guest
Using application.properties
You can also configure connection to RabbitMQ instance using application.properties.
In real world application you need configure using application properties only.
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672 (non-SSL) or 5671 (SSL)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Code language: plaintext (plaintext)
Let’s see how to setup the Topic exchange ,queues and bindings programmatically in Spring Boot application.
We create a configuration class to create topic exchange ,queues and bindings
package dev.fullstackcode.sb.rabbitmq.producer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
@Bean
Queue queueA() {
return new Queue("queue.A", false);
}
@Bean
Queue queueB() {
return new Queue("queue.B", false);
}
@Bean
Queue queueC() {
return new Queue("queue.C", false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange.topic");
}
@Bean
Binding bindingA(Queue queueA, TopicExchange exchange) {
return BindingBuilder.bind(queueA).to(exchange).with("report.monthly.#");
}
@Bean
Binding bindingB(Queue queueB, TopicExchange exchange) {
return BindingBuilder.bind(queueB).to(exchange).with("report.*.weekly");
}
@Bean
Binding bindingC(Queue queueC, TopicExchange exchange) {
return BindingBuilder.bind(queueC).to(exchange).with("report.#");
}
@Bean
ApplicationRunner runner(ConnectionFactory cf) {
return args -> cf.createConnection().close();
}
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
Code language: Java (java)
Queue – beans are used to create queue
TopicExchange – beans are used to create Topic exchange
Binding – beans are used to create bindings between exchange and Queue.
MessageConverter – is used to convert object to Json format
RabbitTemplate – is used to configure the RabbitTemplate.
Creating Exchange and Queues on Startup
Spring Boot application connects to RabbitMQ server instance and creates Exchange and Queues when first message is published . If you want your application to create Exchange and Queues on application startup , you should use following method.
@Bean
ApplicationRunner runner(ConnectionFactory cf) {
return args -> cf.createConnection().close();
}
Code language: Java (java)
Publishing Messages
Once RabbitTemplate is configured , it is very easy to publish messages. We can use overloaded send/convertAndSend method to publish messages.
Let’s develop a controller class which publishes messages.
package dev.fullstackcode.sb.rabbitmq.producer.controller;
import dev.fullstackcode.sb.rabbitmq.producer.model.Event;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
@RestController
@RequestMapping(value ="rabbitmq/event")
public class RabbitMQProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange topicExchange;
@PostMapping
public String send(@RequestBody Event event){
if( event.getName().equalsIgnoreCase("Event A")) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "report.monthly", event);
} else if (event.getName().equalsIgnoreCase("Event B")) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "report.retail.weekly", event);
} else if (event.getName().equalsIgnoreCase("Event X")) {
rabbitTemplate.convertAndSend(topicExchange.getName(), "report.business.weekly", event);
}else {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,"unknown event");
}
return "message sent successfully";
}
}
Code language: Java (java)
Starting the RabbitMQ
Let’s use following docker compose file for starting RabbitMQ server in local development for our testing.
version: '3'
services:
rabbitmq:
container_name: rabbitmq
hostname: my-rabbitmq
image: rabbitmq:3.10.6-management-alpine
ports:
- '5672:5672'
- '15672:15672'
Code language: Java (java)
docker-compose up
Code language: plaintext (plaintext)
Starting Spring Boot RabbitMQ producer Application
Let’s start the springboot-raabitmq-producer application and publish some messages.
As we are connecting to RabbitMQ server on startup, it should create Direct Exchange and Queues and bindings in RabbitMQ instance.
We can check this by going to management console of RabbitMQ. It is available at
http://localhost:15672
username/password -> guest/guest
Below images confirm that exchange, queue and bindings created successfully on startup.
Publishing Messages
Let’s publish messages by sending request to controller class using rest client
Let’s look at the management console.
queue.A -> 1 message (Event A)
queue.B ->1 message (Event B)
queue.C -> 2 messages (Event A,Event B) as it matches routing key of both Event A and Event B.
Setup Consumer Project
To setup consumer project, you can follow the same procedure explained in producer setup section
Consuming Messages
Now let’s develop a consumer application which consumes messages from RabbitMQ server.
First define MessageConverter to Convert Json String back to Object as we are sending Object in json format from producer.
package com.fullstackcode.sb.rabbitmq.consumer.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
@Bean
MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Code language: Java (java)
@RabbitListener
annotation is used to receive messages from RabbitMQ queues.
Spring Boot will take care of converting message to corresponding Object
package com.fullstackcode.sb.rabbitmq.consumer.listener;
import com.fullstackcode.sb.rabbitmq.consumer.model.Event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RabbitMQConsumer {
@RabbitListener(queues = "queue.A")
private void receiveQueueA(Event event) {
log.info("Event received from queue A -> {}",event);
}
@RabbitListener(queues = "queue.B")
private void receiveQueueB(Event event) {
log.info("Event received from queue B -> {}",event);
}
@RabbitListener(queues = "queue.C")
private void receiveQueueC(Event event) {
log.info("Event received from queue C -> {}",event);
}
}
Code language: Java (java)
If you want to configure server properties uisng application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672 (non-SSL) or 5671 (SSL)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
server.port=8081
Code language: plaintext (plaintext)
Once we start the consumer application, we can see following output on the console.
INFO : Event received from queue A -> Event(id=1, name=Event A)
INFO : Event received from queue B -> Event(id=2, name=Event B)
INFO : Event received from queue C -> Event(id=2, name=Event B)
INFO : Event received from queue C -> Event(id=1, name=Event A)
Code language: Java (java)
Connecting to Remote Host
The above consumer example connects to RabbitMQ instance running in local
If you want to connect to instance running on server, you need specify the server address in application.properties
You can configureaddress like below
For SSL connection
spring.rabbitmq.addresses=amqps://<username>:<password>@<host>/<virtual-host>
Code language: Java (java)
For non-SSL connection
spring.rabbitmq.addresses=amqp://<username>:<password>@<host>/<virtual-host>
Code language: Java (java)
If you prefer individual properties, you can also configure like below
spring.rabbitmq.host=<host>
spring.rabbitmq.virtual-host=<virtaul-host>
spring.rabbitmq.port=5671 (SSL) or 5672 ( non - SSL)
spring.rabbitmq.username=<username>
spring.rabbitmq.password=<password>
Code language: Java (java)
Note
RabbitMQ in general listens on port 5672. If RabbitMQ instance is running on SSL, it will listen on port 5671
You can download source code from GitHub
producer – https://github.com/sureshgadupu/sb-rabbitmq-topicexchange
consumer – https://github.com/sureshgadupu/sb-rabbitmq-topicexchange-consumer
You might also like