Spring Boot – Testing RabbitMQ with Testcontainers

In this tutorial, I will explain how to do integrating testing of spring boot applications which uses RabbitMQ server for messaging using Testcontainers.

In my previous blog post I have covered how to write RabbitMQ producer and consumer application using Spring Boot framework.

Now, we will write integration tests for testing the logic of RabbitMQ producer and consumer applications using Testcontainers .

Testing RabbitMQ Producer

1. Adding Dependencies

First we need to add Testcontainers related dependencies.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> .... <properties> <java.version>11</java.version> <testcontainers.version>1.17.2</testcontainers.version> </properties> <dependencies> ... <dependency> <groupId>org.testcontainers</groupId> <artifactId>junit-jupiter</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>rabbitmq</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.testcontainers</groupId> <artifactId>testcontainers-bom</artifactId> <version>${testcontainers.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project>
Code language: Java (java)

2. Add Maven Plugin

Add Fail Safe maven plugin to run the integration test.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> ... <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <version>3.0.0-M5</version> </plugin> </plugins> </build> </project>
Code language: Java (java)

Starting RabbitMQ server

We can start RabbitMQ server for integration tests using Testcontainers in 2 ways.

  1. Using RabbitMQ module
@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @Slf4j public class RabbitMQProducerControllerIT { static RabbitMQContainer rabbitMQContainer ; static { rabbitMQContainer = new RabbitMQContainer("rabbitmq:3.10.6-management-alpine") .withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer.start(); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { log.info("url ->{}",rabbitMQContainer.getAmqpUrl()); log.info("port ->{}",rabbitMQContainer.getHttpPort()); registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getAmqpPort()); } }
Code language: Java (java)

Note

This module is in INCUBATING stage. While it is ready for use and operational in the current version of Testcontainers, it is possible that it may receive breaking changes in the future. 

2. Using Generic Container

While using GenericContainer to start the server we can use either HostPortWaiting Strategy or LogMessageWaitingStrategy to know to whether server started

2.1 Using HostPort waiting Strategy

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @Slf4j public class RabbitMQProducerController2IT { static GenericContainer rabbitMQContainer ; static { WaitStrategy waitStrategy = new HostPortWaitStrategy().withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer = new GenericContainer("docker.io/rabbitmq:3.10.6-management-alpine") .withExposedPorts(5672,15672); rabbitMQContainer.setWaitStrategy(waitStrategy); rabbitMQContainer.start(); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getMappedPort(5672)); } }
Code language: Java (java)

2.2 Using Log messages waiting strategy

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @Slf4j public class RabbitMQProducerController2IT { static GenericContainer rabbitMQContainer ; static { WaitStrategy waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1).withStartupTimeout(Duration.ofSeconds(60L)); rabbitMQContainer = new GenericContainer("docker.io/rabbitmq:3.10.6-management-alpine") .withExposedPorts(5672,15672); rabbitMQContainer.setWaitStrategy(waitStrategy); rabbitMQContainer.start(); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getMappedPort(5672)); } }
Code language: Java (java)

Writing Integration Tests

In our producer application we are going to performing following tests

  1. Creation of exchange
  2. Creation of Queue
  3. Publish Object to Queue

Testing creation of exchange

To know the creation of exchange we have to use the Rest API provided by the RabbitMQ admin module.

Following API gives information about exchanges

http://<raabitmq-host>:<rabbimq-port>/api/exchanges

Using TestRestTemplate we make call to REST API.

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @Slf4j public class RabbitMQProducerControllerIT { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private RabbitMQProducerController rabbitMQProducerController; @Autowired private TestRestTemplate testRestTemplate; static RabbitMQContainer rabbitMQContainer ; static { rabbitMQContainer = new RabbitMQContainer("rabbitmq:3.10.6-management-alpine") .withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer.start(); } @Test public void testExchangeCreation() { ResponseEntity<Object> exchanges = testRestTemplate.withBasicAuth("guest","guest").getForEntity("http://"+ rabbitMQContainer.getHost()+":"+rabbitMQContainer.getHttpPort()+"/api/exchanges", Object.class); log.info("exchanges {}",exchanges); assertEquals(200,exchanges.getStatusCode().value()); assertTrue(exchanges.getBody().toString().contains("name=exchange.direct, type=direct")); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { log.info("url ->{}",rabbitMQContainer.getAmqpUrl()); log.info("port ->{}",rabbitMQContainer.getHttpPort()); registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getAmqpPort()); } }
Code language: Java (java)

Testing creation of Queues

To know the creation of queues we have to use the Rest API provided by the RabbitMQ admin module.

Following API gives information about queues

http://<raabitmq-host>:<rabbimq-port>/api/queues

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @Slf4j public class RabbitMQProducerControllerIT { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private RabbitMQProducerController rabbitMQProducerController; @Autowired private TestRestTemplate testRestTemplate; static RabbitMQContainer rabbitMQContainer ; static { rabbitMQContainer = new RabbitMQContainer("rabbitmq:3.10.6-management-alpine") .withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer.start(); } // RabbitMQ admin Rest API //https://rawcdn.githack.com/rabbitmq/rabbitmq-server/v3.10.7/deps/rabbitmq_management/priv/www/api/index.html @Test public void testQueueCreation() throws JsonProcessingException { ResponseEntity<Object> queues = testRestTemplate.withBasicAuth("guest","guest").getForEntity("http://"+ rabbitMQContainer.getHost()+":"+rabbitMQContainer.getHttpPort()+"/api/queues", Object.class); log.info("queues {}",queues.getBody().toString()); assertEquals(200,queues.getStatusCode().value()); assertTrue(queues.getBody().toString().contains("name=queue.A")); assertTrue(queues.getBody().toString().contains("name=queue.B")); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { log.info("url ->{}",rabbitMQContainer.getAmqpUrl()); log.info("port ->{}",rabbitMQContainer.getHttpPort()); registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getAmqpPort()); } }
Code language: Java (java)

Testing publishing events.

We will publish event to Queue and verify that event reached intended to Queue.

In below example I am publishing “Event A” object ,as per our controller logic Event A will be sent to Queue A. So next we will check Queue A whether it received the message.

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @DirtiesContext @Slf4j public class RabbitMQProducerControllerIT { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private RabbitMQProducerController rabbitMQProducerController; @Autowired private TestRestTemplate testRestTemplate; static RabbitMQContainer rabbitMQContainer ; static { rabbitMQContainer = new RabbitMQContainer("rabbitmq:3.10.6-management-alpine") .withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer.start(); } @Test public void testSendMessage() throws Exception { Event event = new Event(); event.setId(1); event.setName("Event A"); rabbitMQProducerController.send(event); Message message = rabbitTemplate.receive("queue.A",10); Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); Event eventReceived = (Event) converter.fromMessage(message); assertEquals(eventReceived.getId(),event.getId()); assertEquals(eventReceived.getName(),event.getName()); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { log.info("url ->{}",rabbitMQContainer.getAmqpUrl()); log.info("port ->{}",rabbitMQContainer.getHttpPort()); registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getAmqpPort()); } }
Code language: Java (java)

Running Producer Integration Tests.

Use following command to run the integration tests.

mvn clean verify
Code language: Java (java)

Testing RabbitMQ Consumer

We add dependencies and maven plugin ( Step 1 and 2) similar to producer application.

Testing consuming events

For testing RabbitMQ consumer application first we need to create Queues so that we publish events to Queuse so that consumer application receive them.

To create Queues in RabbitMQ for testing , We will use @TestConfiguration annotation.

We can create TestConfiguration 2 ways.

  1. Create Separate Class and import it into Integration test class
@TestConfiguration public class RabbitMQTestConfiguration { @Bean Queue queueA() { return new Queue("queue.A", false); } @Bean Queue queueB() { return new Queue("queue.B", false); } @Bean ApplicationRunner runner(ConnectionFactory cf) { return args -> cf.createConnection().close(); } @Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); return rabbitTemplate; } }
Code language: Java (java)
@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @Import(dev.fullstackcode.sb.rabbitmq.consumer.config.RabbitMQTestConfiguration.class) @ExtendWith(OutputCaptureExtension.class) @DirtiesContext @Slf4j public class RabbitMQConsumerIT { }
Code language: Java (java)

2.Declare configuration as static inner class

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @ExtendWith(OutputCaptureExtension.class) @DirtiesContext @Slf4j public class RabbitMQConsumerIT { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private TestRestTemplate testRestTemplate; static RabbitMQContainer rabbitMQContainer ; static { rabbitMQContainer = new RabbitMQContainer("rabbitmq:3.10.6-management-alpine") .withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer.start(); } @TestConfiguration public static class RabbitMQTestConfiguration { @Bean Queue queueA() { return new Queue("queue.A", false); } @Bean Queue queueB() { return new Queue("queue.B", false); } @Bean ApplicationRunner runner(ConnectionFactory cf) { return args -> cf.createConnection().close(); } @Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); return rabbitTemplate; } } }
Code language: Java (java)

Next we need to create application.properties file in resources folder under test module put following property to use the configuration from TestConfiguration file.

application.properties

spring.main.allow-bean-definition-overriding=true
Code language: Java (java)

Next we will publish events to Queues and expects them received by consumer application.

@Testcontainers @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) //@Import(dev.fullstackcode.sb.rabbitmq.consumer.config.RabbitMQTestConfiguration.class) @ExtendWith(OutputCaptureExtension.class) @DirtiesContext @Slf4j public class RabbitMQConsumerIT { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Autowired private TestRestTemplate testRestTemplate; static RabbitMQContainer rabbitMQContainer ; static { rabbitMQContainer = new RabbitMQContainer("rabbitmq:3.10.6-management-alpine") .withStartupTimeout(Duration.of(100, SECONDS)); rabbitMQContainer.start(); } @TestConfiguration public static class RabbitMQTestConfiguration { @Bean Queue queueA() { return new Queue("queue.A", false); } @Bean Queue queueB() { return new Queue("queue.B", false); } @Bean ApplicationRunner runner(ConnectionFactory cf) { return args -> cf.createConnection().close(); } @Bean MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); return rabbitTemplate; } } @Test public void testReceiveMessageFromQueueA(CapturedOutput output) throws Exception { Event event = new Event(); event.setId(1); event.setName("Event A"); rabbitTemplate.convertSendAndReceive("queue.A",event); Assertions.assertThat(output).contains("Event(id=1, name=Event A)"); } @Test public void testReceiveMessageFromQueueB(CapturedOutput output) throws Exception { Event event = new Event(); event.setId(1); event.setName("Event B"); rabbitTemplate.convertSendAndReceive("queue.B",event); Assertions.assertThat(output).contains("Event(id=1, name=Event B)"); } @DynamicPropertySource public static void properties(DynamicPropertyRegistry registry) { log.info("url ->{}",rabbitMQContainer.getAmqpUrl()); log.info("port ->{}",rabbitMQContainer.getHttpPort()); registry.add("spring.rabbitmq.host",() -> rabbitMQContainer.getHost()); registry.add("spring.rabbitmq.port",() -> rabbitMQContainer.getAmqpPort()); } }
Code language: Java (java)

Running Consumer Integration Tests.

Use following command to run the integration tests.

mvn clean verify
Code language: Java (java)

Complete code for blog post can be downloaded from GitHub.

Producer – sureshgadupu/rabbitmq-producer-testcontainers (github.com)

Consumer – sureshgadupu/rabbitmq-consumer-testcontainers (github.com)

You might be interested in

Similar Posts