Using Kafka with Spring Boot
In this blog post, I will explain how to integrate a Spring Boot application with Apache Kafka and start sending and consuming messages. I will develop a Kafka producer application which will publish messages to a topic and Kafka consumer application which consumes messages from Kafka topic using Spring Boot framework.
In my previous blog post I have covered how to create Kafka cluster in local development machine using kafka binary, docker or cloud and create topics and post messages to topic and consuming messages from topic using CLI tools.
In this post, using Spring Boot Kafka Producer application we will post a message to Topic and use Spring Boot Kafka Consumer application to consumes messages from topic.
To demonstrate Kafka producer and consumer functionality we use the following architecture
- REST client will post a request to springboot-kaka-producer application
- springboot-kafka-produce application will receive the request and post to topic in kafka cluster
- springboot-kafka-consumer application will read messages from the topic
Building Spring Boot Kafka Producer
Setting Up Project
Go to Spring Initializr and add the following starter dependencies to a project.
- Spring Web
- Spring for Apache Kafka
Change the Name to “springboot-kafka-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)
Import the project to your favorite IDE.
Configuring Kafka Producer
You can configure Kafka producer in 2 ways
- application.properties
- Configuration class
Using application properties file
You can use following properties to configure kafka producer.
spring.kafka.properties.bootstrap.servers = localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.json.type.mapping="Employee:dev.fullstackcode.kafka.producer.dto.Employee"
Code language: Java (java)
One limitation of configuring producer with properties file, you can not use custom deserializers
Using configuration class
Spring Boot provides a number of built-in support classes for integrating with Kafka. The DefaultKafkaProducerFactory
class allows you to create Kafka producers which can be used to connect and publish messages to Kafka cluster.
KafkaTemplate is the base class for all Spring Kafka applications. Its a template class for executing high-level operations.. The KafkaTemplate
wraps a producer and provides convenience methods to send data to Kafka topics. The main purpose of KafkaTemplate is to provide a generic way of integrating Spring Kafka with Spring Boot applications.
To use the KafkaTemplate
, you can configure a producer factory and provide it in the template’s constructor.
Following code shows the configuration of Kafka producer.
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.properties.bootstrap.servers}") private String bootstrapServerUrl; @Bean public Map<String, Object> producerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee"); return props; } @Bean public ProducerFactory<Integer, Employee> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean public KafkaTemplate<Integer, Employee> kafkaTemplate(ProducerFactory<Integer, Employee> producerFactory) { return new KafkaTemplate<Integer, Employee>(producerFactory); } }
BOOTSTRAP_SERVERS_CONFIG – The Kafka cluster/broker url,
KEY_SERIALIZER_CLASS_CONFIG – Key Serializer class. It uniquely identifies the each message.
VALUE_SERIALIZER_CLASS_CONFIG – Value serializer class. This is the actual message which is sent to topic.
TYPE_MAPPINGS– provides type mapping.
Using KafkaTemplate
In our service class we can use KafkaTemplate instance to send the messages to topic
@Service public class EmployeeService { @Autowired private KafkaTemplate<Integer, Employee> kafkaTemplate; public void publishEmployee(Employee employee) { kafkaTemplate.send("create-employee-events",employee); } }
Writing Controller
@RestController @RequestMapping("/employee") public class EmployeeController { @Autowired EmployeeService employeeService; @PostMapping public void publishEmployee(@RequestBody Employee employee) { employeeService.publishEmployee(employee); } }
Start the Application
Before sending request to producer application , we need to start the Kafka cluster
Start Zookeeper
zookeeper-server-start.bat ..\..\config\zookeeper.properties
Start Kafka Cluster
kafka-server-start.bat ..\..\config\server.properties
List current topics
kafka-topics.bat --list --bootstrap-server localhost:9092
Create required topic
Since we are sending messages to create-employee-events
topic, I am creating topic from cli. You can also create topic from Spring Boot application during startup. Please visit the section for sample code .
kafka-topics.bat --create --topic create-employee-events --bootstrap-server localhost:9092
Note
You can also start kafka cluster using docker. For more details visit the post
Send a post request to controller
Now lets send a post request to employee controller
Now we can test whether the message reached topic in Kafka cluster with CLI consumer.
kafka-console-consumer.bat --topic create-employee-events --from-beginning --bootstrap-server localhost:9092
Publishing to Confluent Cloud
If you have created Kafka cluster using confluent kafka managed service, you need to configure couple of extra properties. You need to have bootstrap server address, API key and secret key to connect to cluster in confluent cloud.
You can grab the required properties from cluster dashbaord.
1.Login into your confluent account.
2.On next screen, click on the cluster you want to connect to from producer application. The next screen shows cluster dashboard.
3. On the cluster dashboard screen, Click on the DataIntegration
-> client
link on the left side navigation. Next click on the New Client
button from main page.
4. On next screen, choose Spring Boot as your language
5. Once you choose the Spring Boot
option, site will show required properties to configure producer.
In those properties , username and password will contain place holders.
You need to replace place holders with cluster AP key and Secret key.
If you have any already generated them previously you can use them or create new API key and Secret by clicking on the Create Kafka cluster API Key
button the right.
If you create new API key , it will automatically prefilled in the properties file. Copy the properties and use in your application.
The updated properties file looks like below
spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='XXXXXXX' password='YYYYYYYY'; spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.type.mapping=Employee:dev.fullstackcode.kafka.producer.dto.Employee # Best practice for higher availability in Apache Kafka clients prior to 3.0 spring.kafka.properties.session.timeout.ms=45000
If are using producer config class, add properties to config and application.properties file
@Bean public Map<String, Object> producerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee"); // confluent connection properties props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='<apikey>' password='<secret>';"); props.put("security.protocol", "SASL_SSL"); return props; }
application.properties
spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<apikey' password='<secret>'; spring.kafka.properties.security.protocol=SASL_SSL
Building Spring Boot Kafka Consumer
We can receive messages by configuring a MessageListenerContainer
and providing a message listener or by using the @KafkaListener
annotation.
Configuring Kafka Consumer
Similar to Producer Factory, we have Consumer Factory class, The DefaultKafkaConsumerFactory
class allows you to consume messages by listening to a topic.
@Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.properties.bootstrap.servers}") private String bootstrapServerUrl; private Map<String, Object> consumerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee"); return props; } @Bean public ConsumerFactory<Integer, Employee> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfig()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Employee>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, Employee> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; }
Note
For producer config we use Serializers and For consumer config we use Deserailizers.
Write Listener method
Write method with @KafkaListener
annotation to listen to the topic.
@Component public class KafkaListener { @org.springframework.kafka.annotation.KafkaListener(groupId ="groups", topics = "create-employee-events" ) public void listen(Employee data) { System.out.println(data); } }
application.properties
As producer application is already running at port 8080, I am starting the consumer application at port 9090.
Add server.port=9090 property in application.properties to start application at port 9090
spring.kafka.bootstrap-servers = localhost:9092 server.port=9090
Start Consumer Application
once consumer is started it will start reading messages from given topic.
Consuming from Confluent Cloud
You can use following properties to consume from confluent cloud. for API key and secret ,please follow the same procedure as producer.
spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxx' password='xxxxxx'; spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.type.mapping=Employee: dev.fullstackcode.kafka.consumer.dto.Employee
If you are using consumer config class, you need to add properties to config and application properties
private Map<String, Object> consumerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee"); // confluent connection properties props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='<apikey>' password='<secret>>';"); props.put("security.protocol", "SASL_SSL"); return props;
application.properties
spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxx' password='xxxxxx'; spring.kafka.properties.security.protocol=SASL_SSL
Creating topics from Spring Boot Application
Previously we have created topics using CLI commands. We can also create topics programatically.
If you define a KafkaAdmin
bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic
@Bean
for each topic to the application context.
When using Spring Boot, a KafkaAdmin
bean is automatically registered so you only need the NewTopic
.
Ad following class to producer application.
@Configuration public class KafkaTopics { @Bean public NewTopic topic1() { return TopicBuilder.name("springboot-topic").build(); } }
If you run the program , it will create new topic in the Kafka cluster on startup
Trouble shooting
While consumer application trying to read messages I faced with following 2 issues
- The class ‘xxx’ is not in the trusted packages exception.
The error message looks like below.
Caused by: java.lang.IllegalArgumentException: The class 'dev.fullstackcode.kafka.producer.dto.Employee' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) ~[spring-kafka-2.8.4.jar:2.8.4] at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.4.jar:2.8.4] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572) ~[spring-kafka-2.8.4.jar:2.8.4] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.1.jar:na]
To fix the issue you need add following property to consumer config.
private Map<String, Object> consumerConfig() { Map<String, Object> props = new HashMap<>(); ... props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); ... return props; }
2. RecordDeserializationException Error deserializing key/value for partition.
The error looks like below
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition create-employee-events-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [dev.fullstackcode.kafka.producer.dto.Employee]; nested exception is java.lang.ClassNotFoundException: dev.fullstackcode.kafka.producer.dto.Employee
The above error occurred due to package mismatch.
In producer application Employee
class in dev.fullstackcode.kafka.producer.dto
package
in consumer application Employee
class in dev.fullstackcode.kafka.consumer.dto
package.
So the deserializer is not able to convert object to Employee object.
To overcome this problem we need to setup type mapping both producer and consumer config.
In Producer config
public Map<String, Object> producerConfig() { Map<String, Object> props = new HashMap<>(); ......... props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee"); return props; }
In consumer config
private Map<String, Object> consumerConfig() { Map<String, Object> props = new HashMap<>(); .... props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee"); return props; }
Note
The corresponding objects in producer and consumer application must be compatible.
You can download source code from github
producer – sureshgadupu/springboot-kafka-producer (github.com)
consumer – sureshgadupu/springboot-kafka-consumer (github.com)
You might be also interested in