How to Publish Subscribe to Kafka with Spring Boot and SASL/SCRAM

In this blog post, I will explain how to connect from Spring Boot application with Kafka cluster which is using SASL/SCRAM authentication.

In one of previous blog post , I have already covered connecting SpringBoot applications with Kafka cluster. The example covered was using different authentication mechanism.


Kafka supports different authenticate protocols.

  • GSSAPI is used primarly by Keberos .
  • PLAIN is just a clear text password mechanism.
  • SCRAM is a password based challenge response mechanism

Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN. Apache Kafka supports password encryption in SCRAM-SHA-256 and SCRAM-SHA-512 algorithms.

The SCRAM implementation in Kafka stores SCRAM credentials in ZooKeeper and is suitable for use in Kafka installations where ZooKeeper is on a private network. Because of this, you must create SCRAM credentials for users in ZooKeeper.

In Java this is managed by the JAAS(Java Authentication and Authorization Service) which manages the authentication on application behalf.

If you want to configure your local Kafka cluster to use SASL/SCRAM , you can visit the below links for instructions


  1. Create Kafka cluster and Topics in CloudKarafka cloud ( If you have not already created)
  2. Develop a Spring Boot producer application which sends/posts messages to Cluster/Topic
  3. Develop a Spring Boot consumer application which reads/receives messages from the cluster/topic

To demonstrate Kafka producer and consumer functionality we use the following architecture

  • REST client will post a request to springboot-kaka-producer application
  • springboot-cloudkarafka-producer application will receive the request and post to topic in kafka cluster
  • springboot-cloudkarafka-consumer application will read messages from the topic

Creating Kafka Cluster

I am going to use Kafka cluster created in CloudKarafka in my previous article. as it uses SASL/SCRAM protocol.

Posting messages to CloudKarafka Cluster

First we need to develop a producer application which posts/sends messages to topic in Kafka cluster.

Building Spring Boot Kafka Producer

Setting Up Project

Go to Spring Initializr and add the following starter dependencies to a project.

  • Spring Web
  • Spring for Apache Kafka

Change the Name to “springboot-cloudkarafka-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)

Import the project to your favorite IDE.

Configuring Kafka Producer

You can configure Kafka producer in 2 ways

  2. Configuration class

Using application properties file

You can use following properties to configure Kafka producer.

spring.kafka.bootstrap-servers=${kafka-servers} required username="${cloudkarafka-username}" password="${cloudkarafka-password}";

You can get all the configuration details from the CloudKarafka details page ( You can navigate to this page by clicking on the cluster instance name)

As Kafka clusters as configured with SSL for security, some times you might need to configure below properties.

you need to get the certificate of the Kafka cluster and import it into Java key store and set file path and password.<path of trust store certificate><password>

Using configuration class

Spring Boot provides a number of built-in support classes for integrating with Kafka. The DefaultKafkaProducerFactory class allows you to create Kafka producers which can be used to connect and publish messages to Kafka cluster.

KafkaTemplate is the base class for all Spring Kafka applications. Its a template class for executing high-level operations.. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. The main purpose of KafkaTemplate is to provide a generic way of integrating Spring Kafka with Spring Boot applications.

To use the KafkaTemplate , you can configure a producer factory and provide it in the template’s constructor. 

Following code shows the configuration of Kafka producer.

public class KafkaProducerConfig {

    private String bootstrapServerUrl;

    private String cloudKarafkaUserName;

    private String cloudKarafkaPassword;

    private final String jaasTemplate = " required username=\"%s\" password=\"%s\";";

    public Map<String, Object> producerConfig()  {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");
        props.put("sasl.mechanism", "SCRAM-SHA-256");        
        props.put("sasl.jaas.config", String.format(jaasTemplate, cloudKarafkaUserName, cloudKarafkaPassword));
        props.put("security.protocol", "SASL_SSL");
        props.put("enable.idempotence" , "false");

        return props;

    public ProducerFactory<Integer, Employee> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());

    public KafkaTemplate<Integer, Employee> kafkaTemplate(ProducerFactory<Integer, Employee> producerFactory) {
        return new KafkaTemplate<Integer, Employee>(producerFactory);


BOOTSTRAP_SERVERS_CONFIG – The Kafka cluster/broker url,

KEY_SERIALIZER_CLASS_CONFIG – Key Serializer class. It uniquely identifies the each message.

VALUE_SERIALIZER_CLASS_CONFIG – Value serializer class. This is the actual message which is sent to topic.

TYPE_MAPPINGS– provides type mapping.

While connecting to cluster with SASL/SCRAM authentication below 2 properties change.


jaasTemplate = “ required username=\”%s\” password=\”%s\”;”

Additional properties you might need to configure based on the SSL configuration.

props.put("ssl.truststore.location", <trust store path>); 
props.put("ssl.truststore.password", <password>); 
props.put("ssl.endpoint.identification.algorithm", ""); 

Using KafkaTemplate

In our service class we can use KafkaTemplate instance to send the messages to topic

public class EmployeeService {

    private KafkaTemplate<Integer, Employee> kafkaTemplate;

    private String topic;

    public void publishEmployee(Employee employee) {

                        result ->System.out.println("Message published successfully to topic: \n"+ result.getProducerRecord()),
                        ex -> System.out.println("Failed to send message"+ ex)


Writing Controller

public class EmployeeController {

    EmployeeService employeeService;

    public void publishEmployee(@RequestBody Employee employee) {


Start the Application

Send a post request to controller

Now lets send a post request to employee controller

Consuming from CloudKarafka Kafka Cluster

Next we need to develop a consumer application which reads messages from topics created CloudKarafka Kafka cluster.

Setting Up Project

You can follow the same steps described for producer application or write the consumer class inside producer application itself.

Building Spring Boot Kafka Consumer

We can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

Configuring Kafka Consumer

You can configure Kafka consumer in 2 ways similar to producer

  2. Configuration class

Using file,,


# cloudkarafka connection properties
#Required connection configs for Kafka producer, consumer, and admin required username="${cloudkarafka-username}" password="${cloudkarafka-password}";*
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer dev.fullstackcode.kafka.consumer.dto.Employee

You might need to configure following additional properties depending on the cluster setup.<path of trust store certificate><password>

Using Configuration Class

Similar to Producer Factory, we have Consumer Factory class, The DefaultKafkaConsumerFactory class allows you to consume messages by listening to a topic.

public class KafkaConsumerConfig {

    private String bootstrapServerUrl;

    private String cloudKarafkaUserName;

    private String cloudKarafkaPassword;

    private final String jaasTemplate = " required username=\"%s\" password=\"%s\";";

    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "dev.fullstackcode.kafka.consumer.dto.Employee");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", String.format(jaasTemplate, cloudKarafkaUserName, cloudKarafkaPassword));
        props.put("security.protocol", "SASL_SSL");

        // additional properties you might need to configure depending on the cluster setup
//        props.put("ssl.truststore.location", "<path to jks certificate>");
//        props.put("ssl.truststore.password", "<password>");
//        props.put("ssl.endpoint.identification.algorithm", "");

        return props;

    public ConsumerFactory<Integer, Employee> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());

    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Employee>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, Employee> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;



For producer config we use Serializers and For consumer config we use Deserailizers.

Write Listener method

Write method with @KafkaListener annotation to listen to the topic.

public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(groupId ="groups", topics = "${cloudkarafka.topic}" )
    public void listen(Employee data) {


As producer application is already running at port 8080, I am starting the consumer application at port 9090.

Add server.port=9090 property in to start application at port 9090


Start Consumer Application

once consumer is started it will start reading messages from given topic.

Trouble shooting

1.Cluster authorization failed

While trying to post messages to CloudKarafka Kafka cluster , I received following error.

  INFO 34824 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
 ERROR 34824 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause

org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

 ERROR 34824 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Aborting producer batches due to fatal error

To fix the issue you need add following property to producer config

 props.put("enable.idempotence" , "false");

You can read more details about idempotency from official doc

You can read about more trouble shooting issues while connecting to kafka cluster from Spring Boot application from one of my my previous blog post

You can download source code from github

producer – sureshgadupu/springboot-cloudkarafka-producer (

consumer – sureshgadupu/springboot-cloudkarafka-consumer (


You might be also interested in

Similar Posts