Spring Apache Kafka – Producer and Consumer

Spring Apache Kafka Initial Concept

A brief discussion on the Initial Concept of Spring Apache Kafka is described in this springcavaj – Spring Apache Kafka Page with necessary concepts and diagrams.

This blog will guide you to understand the use of Spring Apache Kafka for publishing and consuming messages from Apache Kafka Server.

Using Spring Apache Kafka with Spring Boot and use of MySQL DB

Implementation of Apache Kafka is nowadays a trend in most of the companies. Apache Kafka is an event streaming platform where the producer produces the message in a Topic and the consumer consumes the message from that Topic. Let’s make our hand dirty by creating both the producer and the consumer application. I have created a Demo producer application named as spring-apache-kafka-producer-masterclass and a Demo consumer application named as spring-apache-kafka-consumer-masterclass uploaded in personal GitHub account. One can clone both the applications and can run locally in their system. Necessary steps to run the producer application are defined in the README.md file of the spring-apache-kafka-producer-masterclass repository. And to run the consumer application the steps are defined in the README.md file of the spring-apache-kafka-consumer-masterclass repository. To run the both the application parallel-ally the steps are defined in the End To End Testing section of producer application as well as in the consumer application.

Discussion on spring-apache-kafka-producer-masterclass application

  • Brief Description – In this application spring-apache-kafka-producer-masterclass I have used Spring Boot, Maven, Spring Kafka and Java. In this application I mainly focus on implementing the Spring Kafka. It consists of a Controller annotated with @RestController annotation, a config class consists of KafkaTemplate as a @Bean. And a service where this KafkaTemplate Bean is used. To test the service I have used Postman to send the request.
  • Software Used – Software required to develop the above application.
    • Spring Tool Suite-4.7.0-RELEASE – If latest version is available then download that one
    • Apache Maven 3.6.3 – If latest version is available then download that one
    • Java 8 – Not less than Java8
    • Git 2.27.0 – Latest version as available
    • Scala – Download the Latest Version. Latest Version – Scala 2.13
    • Postman v8.3.0 – To test the REST Service
    • List of the software and their download link with the installation steps are briefly described in the README.md file of spring-apache-kafka-producer-masterclass repository.
  • Project Components – The project that I develop in support to the spring-kafka is a Maven project. And I have used one major dependency as spring-kafka.
<!-- Spring Kafka Dependency -->
<dependency>
        <groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

After that I have configured the Apache Kafka in SpringApacheKafkaConfig.java class and annotated using @Configuration annotation. The configurations are like:

@Bean
publicProducerFactory<String, UserModel> producerFactory(){
	Map<String, Object> config =newHashMap<>();
	config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
	config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
	return newDefaultKafkaProducerFactory<String, UserModel>(config);
}
	
@Bean(name = "kafkaJsonTemplate") 
publicKafkaTemplate<String, UserModel> kafkaJsonTemplate() {
	return newKafkaTemplate<>(producerFactory());
}
  • Structure of the Project – To use the Spring Kafka and its libraries in your project you need to structure it in right way.

Spring Boot expects that all source files are located in a sub-packages of the class annotated with @SpringBootApplication. In the above sample project, I have one class named as SpringApacheKafkaProducerApplication.java which has the @SpringBootApplication annotations in it. And it is the starting point of the application.

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringApacheKafkaProducerApplication {
        public static void main(String[] args) {
            SpringApplication.run(SpringApacheKafkaProducerApplication .class, args);
	}
}
  • Spring Kafka Implementation – To implement Spring Kafka I have used one dependency named as spring-kafka. For this you need to define one config class annotated with @Configuration annotation and in that class define a KafkaTemplate bean and used this bean in the desired service.
@Bean
publicProducerFactory<String, UserModel> producerFactory(){
	Map<String, Object> config =newHashMap<>();
	config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
	config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
	return newDefaultKafkaProducerFactory<String, UserModel>(config);
}
	
@Bean(name = "kafkaJsonTemplate") 
publicKafkaTemplate<String, UserModel> kafkaJsonTemplate() {
	return newKafkaTemplate<>(producerFactory());
}

Here you can see that I have define a method producerFactory() and annotated with @Bean annotation. In this method I have defined the configurations to connect to Kafka as:

  1. BOOTSTRAP_SERVERS_CONFIG – It will point to the bootstraps server and here it is 127.0.0.1:9092.
  2. KEY_SERIALIZER_CLASS_CONFIG – A message consists of key and value pairs. And to send a data through network we need Serialization. So, StringSerializer.class is used as the Key Serializer here.
  3. VALUE_SERIALIZER_CLASS_CONFIG – Similarly for the value I have used JsonSerializer.class.

After that I have return the instance of DefaultKafkaProducerFactory<String, UserModel>. Here the UserModel is the data when it is sent through network it is serialized by using JsonSerializer.class.

Now I have used the above kafkaJsonTemplate bean in the service class as:

@Autowired
@Qualifier("kafkaJsonTemplate")
privateKafkaTemplate<String, UserModel>kafkaJsonTemplate;
	
private final staticStringAPACHE_KAFKA_TOPIC_NAME= "springcavaj-topic";
	
public voidsend(UserModel userModel) {
	LOGGER.info("Data to be sent to Kafka - {}", userModel);
	kafkaJsonTemplate.send(APACHE_KAFKA_TOPIC_NAME, userModel);
}

Here you can see that by using the kafkaJsonTemplate I am pushing a message in Kafka Topic named as springcavaj-topic by calling the send() and in the message the userModel object serialized by JsonSerializer.class.

  • Testing the application – To test the producer application, the steps are defined below :
    • Extract the Apache Kafka in a local drive.
    • Open the Command Prompt (CMD) from that location.
    • Start the Zookeeper by running the below command :
      .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
    • Open a new CMD from the same folder location and start the Apache Kafka Server :
      .\bin\windows\kafka-server-start.bat .\config\server.properties
    • After that clone the application from GitHub and set up the application locally in any one of the IDEs like Spring Tool Suite (STS) or Eclipse.
    • Right click on the application
    • Click the Option Run As
    • Select the option Spring Boot App.
    • It will start the application in the port no 7113.
    • Now in Postman you can test the endpoints.
    • To check that whether the Consumer consumes the message or not.
    • Open a new CMD from the same folder location and type the below command to start the Consumer :
      .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic springcavaj-topic --from-beginning

Below I have provided the REST Endpoint URL to publish the message in Apache Kafka Topic

  1. Publish Message in Apache Kafka Topic (springcavaj)localhost:7113/kafka/produceJson
  2. HTTP Method is POST.
  3. Change the data type of the Body to raw -> JSON.
  4. And paste the below message in the body section.
  • Sample JSON Data – I have provided the sample JSON Data to push the message in Apache Kafka Topic named as springcavaj.

{
firstName” : “First Name”,
middleName” : “”,
lastName” : “Last Name”,
mobileNo” : “1234567890”,
email” : “[email protected]”,
panNo” : “ABCDE1234F”
}

Discussion on spring-apache-kafka-consumer-masterclass application

  • Brief Description – In this application spring-apache-kafka-consumer-masterclass I have used Spring Boot, Maven, Spring Kafka, Java and MySQL as underlying database. In this application I mainly focus on implementing the Spring Kafka. It consists of a Controller annotated with @RestController annotation, a config class consists of KafkaTemplate as a @Bean. And a service where this KafkaTemplate Bean is used. To test the service once the spring-apache-kafka-producer-masterclass application publish the message using Postman, then this consumer application will consume the message and save in MySQL DB.
  • Software Used – Software required to develop the above application.
    • Spring Tool Suite-4.7.0-RELEASE – If latest version is available then download that one
    • Apache Maven 3.6.3 – If latest version is available then download that one
    • Java 8 – Not less than Java8
    • Git 2.27.0 – Latest version as available
    • Scala – Download the Latest Version. Latest Version – Scala 2.13
    • MySQL Workbench 8.0 CE – MySQL DB Server – Community Edition
    • SQLYog – One can use SQLYog in replacement of MySQL Workbench
    • List of the software and their download link with the installation steps are briefly described in the README.md file of spring-apache-kafka-consumer-masterclass repository.
  • Project Components – The project that I develop in support to the spring-kafka is a Maven project. And I have used one major dependency as spring-kafka. In addition, I am storing the consumed message in MySQL DB, for that I have used the spring-boot-starter-data-jpa and mysql-connector-java dependencies.
<!-- Spring Kafka Dependency -->
<dependency>
        <groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

After that I have configured the Apache Kafka in SpringApacheKafkaConfig.java class and annotated using @EnableKafka and @Configuration annotations. The configurations are like:

@Bean
publicProducerFactory<String, UserModel> consumerFactory(){
	Map<String, Object> config =newHashMap<>();
	config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
	config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	return newDefaultKafkaConsumerFactory<>(config, newStringDeserailizer(), newJsonDeserializer<>(UserModel.class));
}
	
@Bean
publicConcurrentKafkaListenerContainerFactory<String, UserModel> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, UserModel> containerFactory =newConcurrentKafkaListenerContainerFactory<>();
		containerFactory.setConsumerFactory(consumerFactory());
		returncontainerFactory;
}
  • Structure of the Project – To use the Spring Kafka and its libraries in your project you need to structure it in right way.

Spring Boot expects that all source files are located in a sub-packages of the class annotated with @SpringBootApplication. In the above sample project, I have one class named as SpringApacheKafkaConsumerApplication.java which has the @SpringBootApplication annotations in it. And it is the starting point of the application.

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringApacheKafkaConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(SpringApacheKafkaConsumerApplication.class, args);
	}
}
  • Spring Kafka Implementation – To implement Spring Kafka I have used one dependency named as spring-kafka. For this you need to define one config class annotated with @Configuration annotation and in that class define a KafkaTemplate bean and used this bean in the desired service.
@Bean
publicProducerFactory<String, UserModel> consumerFactory(){
	Map<String, Object> config =newHashMap<>();
	config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
	config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
	return newDefaultKafkaConsumerFactory<>(config, newStringDeserailizer(), newJsonDeserializer<>(UserModel.class));
}
	
@Bean
publicConcurrentKafkaListenerContainerFactory<String, UserModel> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, UserModel> containerFactory =newConcurrentKafkaListenerContainerFactory<>();
		containerFactory.setConsumerFactory(consumerFactory());
		returncontainerFactory;
}

Here you can see that I have define a method consumerFactory() and annotated with @Bean annotation. In this method I have defined the configurations to connect to Kafka for consuming messages as:

  1. BOOTSTRAP_SERVERS_CONFIG – It will point to the bootstraps server and here it is 127.0.0.1:9092.
  2. GROUP_ID_CONFIG – It is mandatory. It provide the ability to guarantee order delivery across an arbitrary no. of consumers. The value is provided as group_json.
  3. KEY_DESERIALIZER_CLASS_CONFIG – A message consists of key and value pairs. To recieve a data through network we need Deserialization. So, StringDeserializer.class is used as the Key Deserializer here.
  4. VALUE_DESERIALIZER_CLASS_CONFIG – Similarly for the value I have used JsonDeserializer.class.

After that I have return the instance of ConcurrentKafkaListenerContainerFactory<String, UserModel>. Here the UserModel is the data when it is received through network it is deserialized by using JsonDeserializer.class.

Now I have used the above kafkaListenerContainerFactory bean in the service class as:

@Autowired
privateSpringApacheKafkaCrudService apacheKafkaCrudService;
	
@KafkaListener(topics ="springcavaj-topic", groupId = "group_json",
		containerFactory = "kafkaListenerContainerFactory")
public void consumeUserModel(UserModel userModel) {
	LOGGER.info("Consume Message from Kafka Topic is : {}", userModel.toString());
	User user = apacheKafkaCrudService.saveUser(userModel);
	LOGGER.info("User after saving in database : {}", user.toString());
}

Here you can see that I have used one annotation as @KafkaListener(topics = “springcavaj-topic“, groupId = “group_json“, containerFactory = “kafkaListenerContainerFactory“).

Let’s examine the keys within the @KafkaListener annotation:

  1. topics – From this topic (springcavaj-topic) the Consumer will consume the messages
  2. groupId – A group (group_json) for consuming the messages.
  3. containerFactory – Here you have to pass the name of the bean (kafkaListenerContainerFacory) where the configurations to connect to Kafka are embedded.
  • Testing the application – To test the consumer application, the steps are defined below :
    • Extract the Apache Kafka in a local drive.
    • Open the Command Prompt (CMD) from that location.
    • Start the Zookeeper by running the below command :
      .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
    • Open a new CMD from the same folder location and start the Apache Kafka Server :
      .\bin\windows\kafka-server-start.bat .\config\server.properties
    • Open a new CMD from the same folder location and create a Apache Kafka Topic. Here the name of the Topic is springcavaj-topic :
      .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic springcavaj-topic
    • Open a new CMD from the same folder location and start the Producer :
      .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic springcavaj-topic
    • Send the message in JSON form at as :
      {"firstName" : "First Name", "middleName" : "", "lastName" : "Last Name", "mobileNo" : "1234567890", "email" : "[email protected]", "panNo" : "ABCDE1234F"}
    • After that clone the application from GitHub and set up the application locally in any one of the IDEs like Spring Tool Suite (STS) or Eclipse.
    • Right click on the application
    • Click the Option Run As
    • Select the option Spring Boot App.
    • It will start the application in the port no 7114
    • Once the application get started it will consume the message from the Topic (springcavaj-topic) and it will save in MySQL DB.
    • The name of the table where it will store the data is User.

PFB the data representaion in the table User :

user_id | email | first_name | last_name | middle_name | mobile_no | pan_no

1 [email protected] First Name Last Name 1234567890 ABCDE1234F

End To End Testing running both the microservices

  • Extract the Apache Kafka in a local drive.
  • Open the Command Prompt (CMD) from that location.
  • Start the Zookeeper by running the below command :
    .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • Open a new CMD from the same folder location and start the Apache Kafka Server :
    .\bin\windows\kafka-server-start.bat .\config\server.properties
  • After that clone both the applications (spring-apache-kafka-producer-masterclass & spring-apache-kafka-consumer-masterclass) from GitHub and set up the application locally in any one of the IDEs like Spring Tool Suite (STS) or Eclipse.
  • Right click on the application
  • Click the Option Run As
  • Select the option Spring Boot App.
  • After that spring-apache-kafka-producer-masterclass application will start on the port 7113 and the c application will start on the port 7114.
  • Once the spring-apache-kafka-producer-masterclass application started, open the Postman and paste the JSON message as provided below:
    {"firstName" : "First Name", "middleName" : "", "lastName" : "Last Name", "mobileNo" : "1234567890", "email" : "[email protected]", "panNo" : "ABCDE1234F"}
  • Once you hit the Send button from Postman, the message will publish in the Apache Kafka Topic. And at the same time it will consumed by the spring-apache-kafka-consumer-masterclass application and will store in MySQL DB in the table named as User.
  • PFB the data representaion in the table User :

user_id | email | first_name | last_name | middle_name | mobile_no | pan_no

1 [email protected] First Name Last Name 1234567890 ABCDE1234F

GitHub Code Link

spring-apache-kafka-producer-masterclassDownload the Source Code from GitHub

spring-apache-kafka-consumer-masterclassDownload the Source Code from GitHub

Common Faced Problems

Spring Apache Kafka Problems

Interview FAQs

Spring Apache Kafka Interview FAQs

Other Useful Links

Spring Data using RDBMS (MySQL DB) and Spring REST

Spring Data using NoSQL DB and Spring REST

Spring Data using Cypher Neo4j DB and Spring REST

Spring Data using Redis and Spring REST

Spring REST

Spring RabbitMQ

Spring Kafka Confluent – Set Up

Spring Kafka Confluent – Producer & Consumer

Spring Apache Kafka Connect

Spring Mongo Kafka Connector

Spring Cloud using Google Cloud SQL & Google Cloud Storage

Spring Cloud using Google Cloud Pub-Sub

Spring Cloud using Google Cloud Spanner

Spring Reactive Web Flux Programming