How to Publish Subscribe to Kafka with Spring Boot and SASL/SCRAM

In this blog post, I will explain how to connect from Spring Boot application with Kafka cluster which is using SASL/SCRAM authentication.

In one of previous blog post , I have already covered connecting SpringBoot applications with Kafka cluster. The example covered was using different authentication mechanism.

What is SASL/SCRAM

Kafka supports different authenticate protocols.

  • GSSAPI is used primarly by Keberos .
  • PLAIN is just a clear text password mechanism.
  • SCRAM is a password based challenge response mechanism

Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN. Apache Kafka supports password encryption in SCRAM-SHA-256 and SCRAM-SHA-512 algorithms.

The SCRAM implementation in Kafka stores SCRAM credentials in ZooKeeper and is suitable for use in Kafka installations where ZooKeeper is on a private network. Because of this, you must create SCRAM credentials for users in ZooKeeper.

In Java this is managed by the JAAS(Java Authentication and Authorization Service) which manages the authentication on application behalf.

If you want to configure your local Kafka cluster to use SASL/SCRAM , you can visit the below links for instructions

https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html

www.ru-rocker.com/2020/05/22/how-to-secure-confluent-kafka-with-ssl-and-sasl-scram

Usecase

  1. Create Kafka cluster and Topics in CloudKarafka cloud ( If you have not already created)
  2. Develop a Spring Boot producer application which sends/posts messages to Cluster/Topic
  3. Develop a Spring Boot consumer application which reads/receives messages from the cluster/topic

To demonstrate Kafka producer and consumer functionality we use the following architecture

  • REST client will post a request to springboot-kaka-producer application
  • springboot-cloudkarafka-producer application will receive the request and post to topic in kafka cluster
  • springboot-cloudkarafka-consumer application will read messages from the topic

Creating Kafka Cluster

I am going to use Kafka cluster created in CloudKarafka in my previous article. as it uses SASL/SCRAM protocol.

Posting messages to CloudKarafka Cluster

First we need to develop a producer application which posts/sends messages to topic in Kafka cluster.

Building Spring Boot Kafka Producer

Setting Up Project

Go to Spring Initializr and add the following starter dependencies to a project.

  • Spring Web
  • Spring for Apache Kafka

Change the Name to “springboot-cloudkarafka-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)

Import the project to your favorite IDE.

Configuring Kafka Producer

You can configure Kafka producer in 2 ways

  1. application.properties
  2. Configuration class

Using application properties file

You can use following properties to configure Kafka producer.

spring.kafka.bootstrap-servers=${kafka-servers}
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${cloudkarafka-username}" password="${cloudkarafka-password}";
spring.kafka.producer.properties.enable.idempotence=false
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.json.type.mapping="Employee:dev.fullstackcode.kafka.producer.dto.Employee"

You can get all the configuration details from the CloudKarafka details page ( You can navigate to this page by clicking on the cluster instance name)

As Kafka clusters as configured with SSL for security, some times you might need to configure below properties.

you need to get the certificate of the Kafka cluster and import it into Java key store and set file path and password.

 spring.kafka.ssl.trust-store-location=<path of trust store certificate>
 spring.kafka.ssl.trust-store-password=<password>

Using configuration class

Spring Boot provides a number of built-in support classes for integrating with Kafka. The DefaultKafkaProducerFactory class allows you to create Kafka producers which can be used to connect and publish messages to Kafka cluster.

KafkaTemplate is the base class for all Spring Kafka applications. Its a template class for executing high-level operations.. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. The main purpose of KafkaTemplate is to provide a generic way of integrating Spring Kafka with Spring Boot applications.

To use the KafkaTemplate , you can configure a producer factory and provide it in the template’s constructor. 

Following code shows the configuration of Kafka producer.

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServerUrl;

    @Value("${cloudkarafka-username}")
    private String cloudKarafkaUserName;

    @Value("${cloudkarafka-password}")
    private String cloudKarafkaPassword;

    private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";

    @Bean
    public Map<String, Object> producerConfig()  {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee");
        props.put("sasl.mechanism", "SCRAM-SHA-256");        
        props.put("sasl.jaas.config", String.format(jaasTemplate, cloudKarafkaUserName, cloudKarafkaPassword));
        props.put("security.protocol", "SASL_SSL");
        props.put("enable.idempotence" , "false");

        return props;
    }

    @Bean
    public ProducerFactory<Integer, Employee> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }


    @Bean
    public KafkaTemplate<Integer, Employee> kafkaTemplate(ProducerFactory<Integer, Employee> producerFactory) {
        return new KafkaTemplate<Integer, Employee>(producerFactory);
    }

}

BOOTSTRAP_SERVERS_CONFIG – The Kafka cluster/broker url,

KEY_SERIALIZER_CLASS_CONFIG – Key Serializer class. It uniquely identifies the each message.

VALUE_SERIALIZER_CLASS_CONFIG – Value serializer class. This is the actual message which is sent to topic.

TYPE_MAPPINGS– provides type mapping.

While connecting to cluster with SASL/SCRAM authentication below 2 properties change.

sasl.mechanism=SCRAM-SHA-256

jaasTemplate = “org.apache.kafka.common.security.scram.ScramLoginModule required username=\”%s\” password=\”%s\”;”

Additional properties you might need to configure based on the SSL configuration.

props.put("ssl.truststore.location", <trust store path>); 
props.put("ssl.truststore.password", <password>); 
props.put("ssl.endpoint.identification.algorithm", ""); 

Using KafkaTemplate

In our service class we can use KafkaTemplate instance to send the messages to topic

@Service
public class EmployeeService {

    @Autowired
    private KafkaTemplate<Integer, Employee> kafkaTemplate;

    @Value("${cloudkarafka.topic}")
    private String topic;

    public void publishEmployee(Employee employee) {

            kafkaTemplate.send(topic,employee.getId(),employee).addCallback(
                        result ->System.out.println("Message published successfully to topic: \n"+ result.getProducerRecord()),
                        ex -> System.out.println("Failed to send message"+ ex)
                );
    }


}

Writing Controller

@RestController
@RequestMapping("/employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;


    @PostMapping
    public void publishEmployee(@RequestBody Employee employee) {
         employeeService.publishEmployee(employee);
    }

}

Start the Application

Send a post request to controller

Now lets send a post request to employee controller

Consuming from CloudKarafka Kafka Cluster

Next we need to develop a consumer application which reads messages from topics created CloudKarafka Kafka cluster.

Setting Up Project

You can follow the same steps described for producer application or write the consumer class inside producer application itself.

Building Spring Boot Kafka Consumer

We can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

Configuring Kafka Consumer

You can configure Kafka consumer in 2 ways similar to producer

  1. application.properties
  2. Configuration class

Using application.properties file

spring.kafka.bootstrap-servers=tricycle-01.srvs.cloudkafka.com:9094,tricycle-02.srvs.cloudkafka.com:9094,tricycle-03.srvs.cloudkafka.com:9094

cloudkarafka.topic=${cloudkarafka-username}-create-employee-events

# cloudkarafka connection properties
#Required connection configs for Kafka producer, consumer, and admin
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${cloudkarafka-username}" password="${cloudkarafka-password}";
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.type.mapping=Employee: dev.fullstackcode.kafka.consumer.dto.Employee

You might need to configure following additional properties depending on the cluster setup.

 spring.kafka.ssl.trust-store-location=<path of trust store certificate>
 spring.kafka.ssl.trust-store-password=<password>
spring.kafka.ssl.endpoint.identification.algorithm=""

Using Configuration Class

Similar to Producer Factory, we have Consumer Factory class, The DefaultKafkaConsumerFactory class allows you to consume messages by listening to a topic.

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServerUrl;

    @Value("${cloudkarafka-username}")
    private String cloudKarafkaUserName;

    @Value("${cloudkarafka-password}")
    private String cloudKarafkaPassword;

    private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";


    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "dev.fullstackcode.kafka.consumer.dto.Employee");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee");
        props.put("sasl.mechanism", "SCRAM-SHA-256");
        props.put("sasl.jaas.config", String.format(jaasTemplate, cloudKarafkaUserName, cloudKarafkaPassword));
        props.put("security.protocol", "SASL_SSL");

        // additional properties you might need to configure depending on the cluster setup
//        props.put("ssl.truststore.location", "<path to jks certificate>");
//        props.put("ssl.truststore.password", "<password>");
//        props.put("ssl.endpoint.identification.algorithm", "");


        return props;
    }

    @Bean
    public ConsumerFactory<Integer, Employee> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Employee>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, Employee> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }


}

Note

For producer config we use Serializers and For consumer config we use Deserailizers.

Write Listener method

Write method with @KafkaListener annotation to listen to the topic.

@Component
public class KafkaListener {


    @org.springframework.kafka.annotation.KafkaListener(groupId ="groups", topics = "${cloudkarafka.topic}" )
    public void listen(Employee data) {
        System.out.println(data);

    }
}

application.properties

As producer application is already running at port 8080, I am starting the consumer application at port 9090.

Add server.port=9090 property in application.properties to start application at port 9090

server.port=9090

Start Consumer Application

once consumer is started it will start reading messages from given topic.

Trouble shooting

1.Cluster authorization failed

While trying to post messages to CloudKarafka Kafka cluster , I received following error.

  INFO 34824 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
 ERROR 34824 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause

org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

 ERROR 34824 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1] Aborting producer batches due to fatal error
    

To fix the issue you need add following property to producer config

 props.put("enable.idempotence" , "false");

You can read more details about idempotency from official doc

https://kafka.apache.org/documentation/#operations_resources_and_protocols

You can read about more trouble shooting issues while connecting to kafka cluster from Spring Boot application from one of my my previous blog post

You can download source code from github

producer – sureshgadupu/springboot-cloudkarafka-producer (github.com)

consumer – sureshgadupu/springboot-cloudkarafka-consumer (github.com)

References

https://niallbunting.com/spring/kafka/ssl/scram/2020/01/10/spring-kafka-scram-setup.html

https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html

www.ru-rocker.com/2020/05/22/how-to-secure-confluent-kafka-with-ssl-and-sasl-scram

https://www.cloudkarafka.com/docs/spring.html

You might be also interested in

Similar Posts