How to Publish Subscribe to Kafka with Spring Boot and SASL/SCRAM
In this blog post, I will explain how to connect from Spring Boot application with Kafka cluster which is using SASL/SCRAM authentication.
In one of previous blog post , I have already covered connecting SpringBoot applications with Kafka cluster. The example covered was using different authentication mechanism.
What is SASL/SCRAM
Kafka supports different authenticate protocols.
- GSSAPI is used primarly by Keberos .
- PLAIN is just a clear text password mechanism.
- SCRAM is a password based challenge response mechanism
Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication like PLAIN. Apache Kafka supports password encryption in SCRAM-SHA-256
and SCRAM-SHA-512
algorithms.
The SCRAM implementation in Kafka stores SCRAM credentials in ZooKeeper and is suitable for use in Kafka installations where ZooKeeper is on a private network. Because of this, you must create SCRAM credentials for users in ZooKeeper.
In Java this is managed by the JAAS(Java Authentication and Authorization Service) which manages the authentication on application behalf.
If you want to configure your local Kafka cluster to use SASL/SCRAM , you can visit the below links for instructions
https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html
www.ru-rocker.com/2020/05/22/how-to-secure-confluent-kafka-with-ssl-and-sasl-scram
Usecase
- Create Kafka cluster and Topics in CloudKarafka cloud ( If you have not already created)
- Develop a Spring Boot producer application which sends/posts messages to Cluster/Topic
- Develop a Spring Boot consumer application which reads/receives messages from the cluster/topic
To demonstrate Kafka producer and consumer functionality we use the following architecture
- REST client will post a request to springboot-kaka-producer application
- springboot-cloudkarafka-producer application will receive the request and post to topic in kafka cluster
- springboot-cloudkarafka-consumer application will read messages from the topic
Creating Kafka Cluster
I am going to use Kafka cluster created in CloudKarafka in my previous article. as it uses SASL/SCRAM protocol.
Posting messages to CloudKarafka Cluster
First we need to develop a producer application which posts/sends messages to topic in Kafka cluster.
Building Spring Boot Kafka Producer
Setting Up Project
Go to Spring Initializr and add the following starter dependencies to a project.
- Spring Web
- Spring for Apache Kafka
Change the Name to “springboot-cloudkarafka-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)
Import the project to your favorite IDE.
Configuring Kafka Producer
You can configure Kafka producer in 2 ways
- application.properties
- Configuration class
Using application properties file
You can use following properties to configure Kafka producer.
spring.kafka.bootstrap-servers=${kafka-servers} spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${cloudkarafka-username}" password="${cloudkarafka-password}"; spring.kafka.producer.properties.enable.idempotence=false spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.json.type.mapping="Employee:dev.fullstackcode.kafka.producer.dto.Employee"
You can get all the configuration details from the CloudKarafka details page ( You can navigate to this page by clicking on the cluster instance name)
As Kafka clusters as configured with SSL for security, some times you might need to configure below properties.
you need to get the certificate of the Kafka cluster and import it into Java key store and set file path and password.
spring.kafka.ssl.trust-store-location=<path of trust store certificate> spring.kafka.ssl.trust-store-password=<password>
Using configuration class
Spring Boot provides a number of built-in support classes for integrating with Kafka. The DefaultKafkaProducerFactory
class allows you to create Kafka producers which can be used to connect and publish messages to Kafka cluster.
KafkaTemplate is the base class for all Spring Kafka applications. Its a template class for executing high-level operations.. The KafkaTemplate
wraps a producer and provides convenience methods to send data to Kafka topics. The main purpose of KafkaTemplate is to provide a generic way of integrating Spring Kafka with Spring Boot applications.
To use the KafkaTemplate
, you can configure a producer factory and provide it in the template’s constructor.
Following code shows the configuration of Kafka producer.
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServerUrl; @Value("${cloudkarafka-username}") private String cloudKarafkaUserName; @Value("${cloudkarafka-password}") private String cloudKarafkaPassword; private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; @Bean public Map<String, Object> producerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(JsonSerializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.producer.dto.Employee"); props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", String.format(jaasTemplate, cloudKarafkaUserName, cloudKarafkaPassword)); props.put("security.protocol", "SASL_SSL"); props.put("enable.idempotence" , "false"); return props; } @Bean public ProducerFactory<Integer, Employee> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean public KafkaTemplate<Integer, Employee> kafkaTemplate(ProducerFactory<Integer, Employee> producerFactory) { return new KafkaTemplate<Integer, Employee>(producerFactory); } }
BOOTSTRAP_SERVERS_CONFIG – The Kafka cluster/broker url,
KEY_SERIALIZER_CLASS_CONFIG – Key Serializer class. It uniquely identifies the each message.
VALUE_SERIALIZER_CLASS_CONFIG – Value serializer class. This is the actual message which is sent to topic.
TYPE_MAPPINGS– provides type mapping.
While connecting to cluster with SASL/SCRAM authentication below 2 properties change.
sasl.mechanism=SCRAM-SHA-256
jaasTemplate = “org.apache.kafka.common.security.scram.ScramLoginModule required username=\”%s\” password=\”%s\”;”
Additional properties you might need to configure based on the SSL configuration.
props.put("ssl.truststore.location", <trust store path>); props.put("ssl.truststore.password", <password>); props.put("ssl.endpoint.identification.algorithm", "");
Using KafkaTemplate
In our service class we can use KafkaTemplate instance to send the messages to topic
@Service public class EmployeeService { @Autowired private KafkaTemplate<Integer, Employee> kafkaTemplate; @Value("${cloudkarafka.topic}") private String topic; public void publishEmployee(Employee employee) { kafkaTemplate.send(topic,employee.getId(),employee).addCallback( result ->System.out.println("Message published successfully to topic: \n"+ result.getProducerRecord()), ex -> System.out.println("Failed to send message"+ ex) ); } }
Writing Controller
@RestController @RequestMapping("/employee") public class EmployeeController { @Autowired EmployeeService employeeService; @PostMapping public void publishEmployee(@RequestBody Employee employee) { employeeService.publishEmployee(employee); } }
Start the Application
Send a post request to controller
Now lets send a post request to employee controller
Consuming from CloudKarafka Kafka Cluster
Next we need to develop a consumer application which reads messages from topics created CloudKarafka Kafka cluster.
Setting Up Project
You can follow the same steps described for producer application or write the consumer class inside producer application itself.
Building Spring Boot Kafka Consumer
We can receive messages by configuring a MessageListenerContainer
and providing a message listener or by using the @KafkaListener
annotation.
Configuring Kafka Consumer
You can configure Kafka consumer in 2 ways similar to producer
- application.properties
- Configuration class
Using application.properties file
spring.kafka.bootstrap-servers=tricycle-01.srvs.cloudkafka.com:9094,tricycle-02.srvs.cloudkafka.com:9094,tricycle-03.srvs.cloudkafka.com:9094 cloudkarafka.topic=${cloudkarafka-username}-create-employee-events # cloudkarafka connection properties #Required connection configs for Kafka producer, consumer, and admin spring.kafka.properties.security.protocol=SASL_SSL spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${cloudkarafka-username}" password="${cloudkarafka-password}"; spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.type.mapping=Employee: dev.fullstackcode.kafka.consumer.dto.Employee
You might need to configure following additional properties depending on the cluster setup.
spring.kafka.ssl.trust-store-location=<path of trust store certificate> spring.kafka.ssl.trust-store-password=<password> spring.kafka.ssl.endpoint.identification.algorithm=""
Using Configuration Class
Similar to Producer Factory, we have Consumer Factory class, The DefaultKafkaConsumerFactory
class allows you to consume messages by listening to a topic.
@Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServerUrl; @Value("${cloudkarafka-username}") private String cloudKarafkaUserName; @Value("${cloudkarafka-password}") private String cloudKarafkaPassword; private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; private Map<String, Object> consumerConfig() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class); props.put(JsonDeserializer.TRUSTED_PACKAGES, "dev.fullstackcode.kafka.consumer.dto.Employee"); props.put(JsonDeserializer.TYPE_MAPPINGS, "Employee:dev.fullstackcode.kafka.consumer.dto.Employee"); props.put("sasl.mechanism", "SCRAM-SHA-256"); props.put("sasl.jaas.config", String.format(jaasTemplate, cloudKarafkaUserName, cloudKarafkaPassword)); props.put("security.protocol", "SASL_SSL"); // additional properties you might need to configure depending on the cluster setup // props.put("ssl.truststore.location", "<path to jks certificate>"); // props.put("ssl.truststore.password", "<password>"); // props.put("ssl.endpoint.identification.algorithm", ""); return props; } @Bean public ConsumerFactory<Integer, Employee> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfig()); } @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, Employee>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, Employee> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } }
Note
For producer config we use Serializers and For consumer config we use Deserailizers.
Write Listener method
Write method with @KafkaListener
annotation to listen to the topic.
@Component public class KafkaListener { @org.springframework.kafka.annotation.KafkaListener(groupId ="groups", topics = "${cloudkarafka.topic}" ) public void listen(Employee data) { System.out.println(data); } }
application.properties
As producer application is already running at port 8080, I am starting the consumer application at port 9090.
Add server.port=9090 property in application.properties to start application at port 9090
server.port=9090
Start Consumer Application
once consumer is started it will start reading messages from given topic.
Trouble shooting
1.Cluster authorization failed
While trying to post messages to CloudKarafka Kafka cluster , I received following error.
INFO 34824 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] Transiting to fatal error state due to org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. ERROR 34824 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state] with root cause org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. ERROR 34824 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : [Producer clientId=producer-1] Aborting producer batches due to fatal error
To fix the issue you need add following property to producer config
props.put("enable.idempotence" , "false");
You can read more details about idempotency from official doc
https://kafka.apache.org/documentation/#operations_resources_and_protocols
You can read about more trouble shooting issues while connecting to kafka cluster from Spring Boot application from one of my my previous blog post
You can download source code from github
producer – sureshgadupu/springboot-cloudkarafka-producer (github.com)
consumer – sureshgadupu/springboot-cloudkarafka-consumer (github.com)
References
https://niallbunting.com/spring/kafka/ssl/scram/2020/01/10/spring-kafka-scram-setup.html
https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html
www.ru-rocker.com/2020/05/22/how-to-secure-confluent-kafka-with-ssl-and-sasl-scram
https://www.cloudkarafka.com/docs/spring.html
You might be also interested in