Testing Spring Boot Kafka with Testcontainers

In this tutorial, I will explain how to do integrating testing of spring boot , kafka applications using Testcontainers.
In my previous blog post I have covered how to write Kafka producer and consumer application using Spring Boot framework.
Now, we will write integration tests for testing the logic of producer and consumer application and use Testcontainers to start the kafka cluster.
Testcontainers can be used to automatically instantiate and manage Apache Kafka containers.
Advantages of using Testcontainers
- Running a single node Kafka installation with just one line of code
- No need to manage external Zookeeper installation, required by Kafka.
Testing Kafka Producer
1. Adding Dependencies
First we need to add Testcontainer related dependencies and awaitality jar dependencies.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
....
<properties>
<java.version>11</java.version>
<testcontainers.version>1.16.3</testcontainers.version>
</properties>
<dependencies>
....
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainers.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
2. Add Maven Plugin
Add Fail Safe maven plugin to run the integration test.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> ... <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <version>3.0.0-M5</version> </plugin> </plugins> </build> </project>
Writing Integration Tests
In our producer application we are performing following tasks
- Create topics called
create-employee-events,springboot-topicon startup - Publishing
Employeeobjects tocreate-employee-eventstopic received from rest client.
Testing Topic creation.
KafkaAdmin and AdminClient classes can be used to create and read topics in kafka cluster.
When using Spring Boot, a KafkaAdmin bean is automatically registered. So you can just autowire into application.
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class SpringBootKafkaProducerIT {
private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaProducerIT.class);
static KafkaContainer kafka;
static {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
kafka.start();
}
@Autowired
private KafkaAdmin admin;
@Test
public void testCreationOfTopicAtStartup() throws IOException, InterruptedException, ExecutionException {
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
Collection<TopicListing> topicList = client.listTopics().listings().get();
assertNotNull(topicList);
assertEquals(topicList.stream().map(l -> l.name()).collect(Collectors.toList()), Arrays.asList("create-employee-events","springboot-topic"));
}
}
@DynamicPropertySource
public static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers);
}
}
Testing publishing events.
- First we create topic named
create-employee-events - Next publish event to topic by invoking
EmployeeControllerclass - We Create KafkaConsumer class to subscribe to the topic and poll the topic to read the published message. I have used awaitality jar to wait the client to read the published message
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
public class SpringBootKafkaProducerIT {
private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaProducerIT.class);
static KafkaContainer kafka;
static {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
kafka.start();
}
@Autowired
EmployeeController employeeController;
@Autowired
private KafkaAdmin admin;
@Test
public void testPublishEmployee() throws IOException, InterruptedException, ExecutionException {
// first create the create-employee-events topic
// you can ignore this step if you have already created topic
String topicName = "create-employee-events";
NewTopic topic1 = TopicBuilder.name(topicName).build();
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
client.createTopics( Collections.singletonList(topic1));
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");
KafkaConsumer<Integer, Employee> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topicName));
Employee emp = new Employee();
emp.setId(1);
emp.setName("Test");
employeeController.publishEmployee(emp);
Collection<TopicListing> topicList = client.listTopics().listings().get();
assertEquals(topicList.size(),2);
List<String> topicNameList = topicList.stream().map(l -> l.name()).collect(Collectors.toList());
List<String> expectedTopicNameList = Arrays.asList("springboot-topic","create-employee-events");
assertTrue(topicNameList.containsAll(expectedTopicNameList) && expectedTopicNameList.containsAll(topicNameList));
await().atMost(10, TimeUnit.SECONDS).until(() -> {
ConsumerRecords<Integer, Employee> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
return false;
}
records.forEach( r -> System.out.println(r.topic() + " *** "+ r.key() + " *** "+ r.value()));
Assertions.assertThat(records.count()).isEqualTo(1);
Assertions.assertThat(records.iterator().next().value().getName()).isEqualTo("Test");
Assertions.assertThat(records.iterator().next().value().getId()).isEqualTo(1);
return true;
});
}
@DynamicPropertySource
public static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.properties.bootstrap.servers",kafka::getBootstrapServers);
}
}Running Producer Integration Tests.
Use following command to run the integration tests.
mvn clean verify
Testing Kafka Consumer
We add dependencies and maven plugin ( Step 1 and 2) similar to producer application.
Testing consuming events
- We create topic named
create-employee-eventsusing kafka admin client. - Using KafkaProducer we publish an event to topic.
- We expect the Kafka consumer to read the message from topic. As consumer application reading and print the message to console, we going to check that console contains required message.
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
@ExtendWith(OutputCaptureExtension.class)
public class SpringBootKafkaConsumerIT {
private static final Logger logger = LoggerFactory.getLogger(SpringBootKafkaConsumerIT.class);
static KafkaContainer kafka;
static {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
kafka.start();
}
@Autowired
private KafkaAdmin admin;
@Test
public void testPublishEmployee(CapturedOutput output) throws IOException, InterruptedException, ExecutionException {
// first create the create-employee-events topic
String topicName = "create-employee-events";
NewTopic topic1 = TopicBuilder.name(topicName).build();
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
client.createTopics( Collections.singletonList(topic1));
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
KafkaProducer<Integer, Employee> producer = new KafkaProducer(props);
Employee emp = new Employee();
emp.setId(1);
emp.setName("Test");
producer.send(new ProducerRecord<>(topicName, emp.getId(), emp)).get();
Thread.sleep(1000);
Assertions.assertThat(output).contains("Employee{id=1, name='Test'}");
}
@DynamicPropertySource
public static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers",kafka::getBootstrapServers);
}
}Running Consumer Integration Tests.
Use following command to run the integration tests.
mvn clean verify
Complete code for blog post can be downloaded from GitHub.
Producer – sureshgadupu/springboot-kafka-producer (github.com)
Consumer – sureshgadupu/springboot-kafka-consumer (github.com)
You might be interested in