Using Kafka with Spring Boot

Kafka with Spring Boot

In this blog post, I will explain how to integrate a Spring Boot application with Apache Kafka and start sending and consuming messages. I will develop a Kafka producer application which will publish messages to a topic and Kafka consumer application which consumes messages from Kafka topic using Spring Boot framework.

In my previous blog post I have covered how to create Kafka cluster in local development machine using kafka binary, docker or cloud and create topics and post messages to topic and consuming messages from topic using CLI tools.

In this post, using Spring Boot Kafka Producer application we will post a message to Topic and use Spring Boot Kafka Consumer application to consumes messages from topic.

To demonstrate Kafka producer and consumer functionality we use the following architecture

Architecture Diagram
  • REST client will post a request to springboot-kaka-producer application
  • springboot-kafka-produce application will receive the request and post to topic in kafka cluster
  • springboot-kafka-consumer application will read messages from the topic

Building Spring Boot Kafka Producer

Setting Up Project

Go to Spring Initializr and add the following starter dependencies to a project.

  • Spring Web
  • Spring for Apache Kafka
Spring Initializr

Change the Name to “springboot-kafka-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)

Import the project to your favorite IDE.

Configuring Kafka Producer

You can configure Kafka producer in 2 ways

  1. application.properties
  2. Configuration class

Using application properties file

You can use following properties to configure kafka producer.

spring.kafka.properties.bootstrap.servers = localhost:9092

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.json.type.mapping="Employee:dev.fullstackcode.kafka.producer.dto.Employee"

Code language: Java (java)

One limitation of configuring producer with properties file, you can not use custom deserializers

Using configuration class

Spring Boot provides a number of built-in support classes for integrating with Kafka. The DefaultKafkaProducerFactory class allows you to create Kafka producers which can be used to connect and publish messages to Kafka cluster.

KafkaTemplate is the base class for all Spring Kafka applications. Its a template class for executing high-level operations.. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. The main purpose of KafkaTemplate is to provide a generic way of integrating Spring Kafka with Spring Boot applications.

To use the KafkaTemplate , you can configure a producer factory and provide it in the template’s constructor. 

Following code shows the configuration of Kafka producer.

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootstrapServerUrl;
    
    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");
        return props;
    }

    @Bean
    public ProducerFactory<Integer, Employee> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }


    @Bean
    public KafkaTemplate<Integer, Employee> kafkaTemplate(ProducerFactory<Integer, Employee> producerFactory) {
        return new KafkaTemplate<Integer, Employee>(producerFactory);
    }

}

BOOTSTRAP_SERVERS_CONFIG – The Kafka cluster/broker url,

KEY_SERIALIZER_CLASS_CONFIG – Key Serializer class. It uniquely identifies the each message.

VALUE_SERIALIZER_CLASS_CONFIG – Value serializer class. This is the actual message which is sent to topic.

TYPE_MAPPINGS– provides type mapping.

Using KafkaTemplate

In our service class we can use KafkaTemplate instance to send the messages to topic

@Service
public class EmployeeService {

    @Autowired
    private KafkaTemplate<Integer, Employee> kafkaTemplate;

    public void publishEmployee(Employee employee) {
        kafkaTemplate.send("create-employee-events",employee);

    }


}

Writing Controller

@RestController
@RequestMapping("/employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;


    @PostMapping
    public void publishEmployee(@RequestBody Employee employee) {
         employeeService.publishEmployee(employee);
    }

}

Start the Application

Before sending request to producer application , we need to start the Kafka cluster

Start Zookeeper

zookeeper-server-start.bat ..\..\config\zookeeper.properties

Start Kafka Cluster

kafka-server-start.bat  ..\..\config\server.properties

List current topics

kafka-topics.bat --list --bootstrap-server localhost:9092

Create required topic

Since we are sending messages to create-employee-events topic, I am creating topic from cli. You can also create topic from Spring Boot application during startup. Please visit the section for sample code .

kafka-topics.bat --create --topic create-employee-events --bootstrap-server localhost:9092

Note

You can also start kafka cluster using docker. For more details visit the post

Send a post request to controller

Now lets send a post request to employee controller

Now we can test whether the message reached topic in Kafka cluster with CLI consumer.

kafka-console-consumer.bat --topic create-employee-events --from-beginning --bootstrap-server localhost:9092

Publishing to Confluent Cloud

If you have created Kafka cluster using confluent kafka managed service, you need to configure couple of extra properties. You need to have bootstrap server address, API key and secret key to connect to cluster in confluent cloud.

You can grab the required properties from cluster dashbaord.

1.Login into your confluent account.

2.On next screen, click on the cluster you want to connect to from producer application. The next screen shows cluster dashboard.

3. On the cluster dashboard screen, Click on the DataIntegration -> client link on the left side navigation. Next click on the New Client button from main page.

4. On next screen, choose Spring Boot as your language

5. Once you choose the Spring Boot option, site will show required properties to configure producer.

In those properties , username and password will contain place holders.

You need to replace place holders with cluster AP key and Secret key.

If you have any already generated them previously you can use them or create new API key and Secret by clicking on the Create Kafka cluster API Key button the right.

If you create new API key , it will automatically prefilled in the properties file. Copy the properties and use in your application.

The updated properties file looks like below

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='XXXXXXX'   password='YYYYYYYY';
spring.kafka.properties.security.protocol=SASL_SSL

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=Employee:dev.fullstackcode.kafka.producer.dto.Employee

# Best practice for higher availability in Apache Kafka clients prior to 3.0
spring.kafka.properties.session.timeout.ms=45000

If are using producer config class, add properties to config and application.properties file

 @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");

//         confluent connection properties
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='<apikey>'   password='<secret>';");
        props.put("security.protocol", "SASL_SSL");


        return props;
    }

application.properties

spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='<apikey'   password='<secret>';
spring.kafka.properties.security.protocol=SASL_SSL

Building Spring Boot Kafka Consumer

We can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

Configuring Kafka Consumer

Similar to Producer Factory, we have Consumer Factory class, The DefaultKafkaConsumerFactory class allows you to consume messages by listening to a topic.

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootstrapServerUrl;


   private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee");
        return props;
    }

    @Bean
    public ConsumerFactory<Integer, Employee> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Employee>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, Employee> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

Note

For producer config we use Serializers and For consumer config we use Deserailizers.

Write Listener method

Write method with @KafkaListener annotation to listen to the topic.

@Component
public class KafkaListener {


    @org.springframework.kafka.annotation.KafkaListener(groupId ="groups", topics = "create-employee-events" )    
    public void listen(Employee data) {
        System.out.println(data);

    }
}

application.properties

As producer application is already running at port 8080, I am starting the consumer application at port 9090.

Add server.port=9090 property in application.properties to start application at port 9090

spring.kafka.bootstrap-servers = localhost:9092
server.port=9090

Start Consumer Application

once consumer is started it will start reading messages from given topic.

Consuming from Confluent Cloud

You can use following properties to consume from confluent cloud. for API key and secret ,please follow the same procedure as producer.

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='xxxx'   password='xxxxxx';
spring.kafka.properties.security.protocol=SASL_SSL

spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.type.mapping=Employee: dev.fullstackcode.kafka.consumer.dto.Employee

If you are using consumer config class, you need to add properties to config and application properties

private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee");

        // confluent connection properties
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule   required username='<apikey>'   password='<secret>>';");
        props.put("security.protocol", "SASL_SSL");

        return props;

application.properties

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='xxxx'   password='xxxxxx';
spring.kafka.properties.security.protocol=SASL_SSL

Creating topics from Spring Boot Application

Previously we have created topics using CLI commands. We can also create topics programatically.

If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context.

When using Spring Boot, a KafkaAdmin bean is automatically registered so you only need the NewTopic.

Ad following class to producer application.

@Configuration
public class KafkaTopics {



    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("springboot-topic").build();
    }
}

If you run the program , it will create new topic in the Kafka cluster on startup

Trouble shooting

While consumer application trying to read messages I faced with following 2 issues

  1. The class ‘xxx’ is not in the trusted packages exception.

The error message looks like below.

Caused by: java.lang.IllegalArgumentException: The class 'dev.fullstackcode.kafka.producer.dto.Employee' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129) ~[spring-kafka-2.8.4.jar:2.8.4]
	at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.4.jar:2.8.4]
	at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:572) ~[spring-kafka-2.8.4.jar:2.8.4]
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.1.jar:na]

To fix the issue you need add following property to consumer config.

private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
       ...
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
       ...
        return props;
    }

2. RecordDeserializationException Error deserializing key/value for partition.

The error looks like below

Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition create-employee-events-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name.
 Class not found [dev.fullstackcode.kafka.producer.dto.Employee]; nested exception is java.lang.ClassNotFoundException: dev.fullstackcode.kafka.producer.dto.Employee

The above error occurred due to package mismatch.

In producer application Employee class in dev.fullstackcode.kafka.producer.dto package

in consumer application Employee class in dev.fullstackcode.kafka.consumer.dto package.

So the deserializer is not able to convert object to Employee object.

To overcome this problem we need to setup type mapping both producer and consumer config.

In Producer config

 public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
 .........      
        props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");
        return props;
    }

In consumer config

 private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
       ....
        props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee");
        return props;
    }

Note

The corresponding objects in producer and consumer application must be compatible.

You can download source code from github

producer – sureshgadupu/springboot-kafka-producer (github.com)

consumer – sureshgadupu/springboot-kafka-consumer (github.com)

You might be also interested in

Similar Posts