Apache Kafka – Producer Consumer using Confluent

Install Linux Distribution in Windows

Steps to run Confluent in Windows are provided below :

  • Open Powershell in Administrator Mode and run the command as –
wsl --list --online

Here you can see the Name and Friendly Name of the distributions.

NAME FRIENDLY NAME
Ubuntu Ubuntu
Debian Debian GNU/Linux
kali-linux Kali Linux Rolling
openSUSE-42 openSUSE Leap 42
SLES-12 SUSE Linux Enterprise Server v12
Ubuntu-16.04 Ubuntu 16.04 LTS
Ubuntu-18.04 Ubuntu 18.04 LTS
Ubuntu-20.04 Ubuntu 20.04 LTS
  • Now to install the distribution you want run the below command –
wsl --install -d <Distribution Name>
  • After the installation of any one of the Distribution is successful, then run the command to set up the User –
    • Username – <Your choice-able username>
    • Password – <Your choice-able password>

Now you can see the Distribution is accessible from Start menu of Windows

Confluent Setup in WSL

In the blog post Apache Kafka Confluent, I have mentioned how to install Confluent in WSL using curl command. In this blog post, I will provide another way if installing Confluent in WSL. Here, we will see how to install Confluent in WSL by downloading the .tar file.

  • First open the Ubuntu on Windows from the Start menu, after successfully installing in Windows machine. A snapshot of Ubuntu Icon is provided below.
  • Open the above app and login with your credentials that you set during the time of installation
  • After that download the Confluent
  • Download the Confluent by using the wget command as –
wget https://packages.confluent.io/archive/6.1/confluent-6.1.0.tar.gz
  • After downloading extract the confluent-6.1.0.tar.gz by running the command as –
tar -xvf confluent-6.1.0.tar.gz

It will extract the files in the same folder where the tar file got downloaded

  • Set the CONFLUENT_HOME environment variable as well as add the CONFLUENT_HOME in the PATH variable as well –
export CONFLUENT_HOME=~/confluent-6.1.0
export PATH=$CONFLUENT_HOME/bin:$PATH
  • After that check the version of the Confluent installed in your Ubuntu system as –
confluent --version

After you have successfully installed Confluent you have to install Confluent-Hub. For this installation you need java to be installed in your Ubuntu.

  • Command to download and install Java are as follows :
sudo apt-get update && sudo apt-get upgrade -y
sudo apt install openjdk-8-jre-headless -y
  • Check whether Java is installed properly or not run the below command to verify –
java -version
  • Now install the Confluent-Hub and the command is –
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
  • Now try to start the 7 services as provided by Confluent are as follows : Connect, Control Center, Kafka, Kafka REST, ksqlDB, Schema Registry and Zookeeper
confluent local services start

Once all the services are up it will show the message as provided below:

  • If the services failed to start then update the Confluent by running the command as –
confluent update
  • Now retry the above command to start the services, it will start the services
  • Even you can start each service separately like –
confluent local services kafka start # It will start the kafka service
confluent local services zookeeper start # It will start the zookeeper service
confluent local services ksql-server start # It will start the ksqlDB service
confluent local services connect start # It will start the connect service
confluent local services control-center start # It will start the control-center service
confluent local services kafka-rest start # It will first start the schema registry service then it will start the kafka-rest service
  • To check the status of the service run the command as –
confluent local services status
  • To stop the services, run the command as –
confluent local services stop

Once all the services are down it will show the messages as provided below –

Confluent Cloud – Create Account

  • Go to Confluent Cloud Link and register yourself
  • You can use either your Google Account or your GitHub account for Sign Up or you can do a fresh sign up as well
  • Once you sign up an email will be thrown in your registered mail Id for Verification purpose. Verify your account. After that you are eligible to use the account. A snapshot of the sign up page is provided below.
  • Once you registered and login using your credentials in Confluent Cloud, you will be redirected to the page where it will prompt to create a cluster.
  • Before creating cluster it will prompt to choose a plan for creating cluster. I have used Basic plan here to create the cluster. Please find below the different types of plan available for creating cluster.
  • Once you choose a plan it will opt to create a cluster, there are 3 options available as AWS, Google Cloud Platform and Microsoft Azure.
  • Select the Google Cluster provide a name to the Cluster
  • Once the cluster is created, now you can create a Topic
  • By default Confluent provide that you can create 6 partitions for a Topic. You can change that value, but remember that 1 partition is minimum requirement to create a Topic.
  • Now once done, you can see that there is a default environment within that environment there is a cluster which you create and within that cluster there is a Topic which you have also create. A snapshot is provided below.

The Overview Page –

The Cluster Page – You can check that I have created a Google Cloud Platform (GCP) Cluster and the name is springcavaj cluster

The Topic Page – I have created 2 topics named as springcavaj-topic and springcavaj-topic-1

Connection with Confluent Cloud with Confluent installed in WSL

Once you are done creating an account in Confluent Cloud, after that creating cluster and lastly creating a Topic. And setting up Confluent in Ubuntu installed in Windows. After that we will connect the confluent installed in Ubuntu with Confluent Cloud

  • Open the Ubuntu on Windows App from Windows Start Menu
  • Login with your Ubuntu credentials
  • Once you are in check whether the Confluent is installed or not by running the version command
confluent --version
  • Once done run the command as –
confluent login --save

Provide the credentials by which you register in Confluent Cloud. Once done confluent will save the credentials in a .netrc file.

  • Run the command to see all the environments –
confluent environment list

It will list down all the environments. A snapshot is provided below :

  • Now use the above environment and the command is –
confluent environment use env-7w35o # here env-7w35o is the ID of the enviroment
  • To see the list of Kafka cluster inside this environment, run the command as –
confluent kafka cluster list

It will list down all the clusters present within that environment. A snapshot is provided below :

You can see that I have created only one cluster named as springcavaj cluster which is Up and running

  • To use this Kafka cluster run the command as –
confluent kafka cluster use lkc-mv2ko2 # Here lkc-mv2ko2 is the ID of the cluster
  • To see the list of Topic inside the Cluster the command is –
confluent kafka topic list

It will list down all the Topics residing in the above cluster. A snapshot is provided below :

  • After this you have to create a Secret API-Key for your cluster. It is discussed in the next section.

Create Secret API-Key for Cluster

There are 2 ways by which you can create API-Key one from the Command Line Interface (CLI) window and another from Confluent Cloud GUI platform

Let’s first discuss, that by using CLI how we can create API-Key

  • To create a new API key the command is –
confluent api-key create --resource lkc-mv2ko2 # lkc-mv2ko2 is the ID of the Cluster

Once you run the command it will create the key. A snapshot is provided below :

From the above snapshot you can see once you run the command it creates the Key. API Key is the name of the Key and after that is the value of the Secret.

  • To store the key run the command as –
confluent api-key store --resource lkc-mv2ko2
Key: <<Provide the API-KEY>>

Secret: <<Provide the Secret>>
  • To see the list of API keys run the command as –
confluent api-key list

It will list down all the API keys that you have created for the cluster. A snapshot is provided below :

From the above snapshot one can see that I have created 2 API Keys for the springcavaj cluster (lkc-mv2ko2)

  • To use any one of the key with the cluster, run the command as –
confluent api-key use KRL3HWF5KX2MCP7H --resource lkc-mv2ko2

Now your cluster is attached with the API-Key, even the Topics that you are creating inside that cluster also get attached with that API Key.

Let’s discuss about creating API-keys from Confluent Cloud GUI

  • Login to Confluent Cloud
  • From the Dashboard Page select the default environment then select the Cluster. For my case the name of the cluster is springcavaj cluster.
  • Once you select the cluster on the left navigation pane you can see the menus.
  • Click on the Data Integration menu.
  • After that click on the Clients sub-menu.

A snapshot is provided below :

  • After clicking the Clients sub-menu you can see a button is there named as Set up a new client.
  • Click on that Set up a new client button.
  • Once you click the button it will open a page where you have to select language in which your application is developed. I have developed the application in Spring Boot. So I choose the Spring Boot option. A snapshot is provided below :
  • Once you select the Spring boot option it will generate some properties that you have to use in your Spring Boot application’s application.properties file.
# Required connection configs for Kafka producer, consumer, and admin
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-41p56.asia-south1.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='{{ CLUSTER_API_KEY }}'   password='{{ CLUSTER_API_SECRET }}';
spring.kafka.properties.security.protocol=SASL_SSL

# Best practice for higher availability in Apache Kafka clients prior to 3.0
spring.kafka.properties.session.timeout.ms=45000

# Required connection configs for Confluent Cloud Schema Registry
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
spring.kafka.properties.schema.registry.url=https://{{ SR_ENDPOINT }}
  • At the right side there is another button named as Create Kafka cluster API Key.
  • Once you click the button it will generate the API Key.
  • Once it generates the API key you can see in the above properties there is a dynamic variable as {{ CLUSTER_API_KEY }} and {{ CLUSTER_API_SECRET }}. It will now replace with the value as generated.
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='OHINNNKVF65HY4RV'   password='gxgpnaDi/JQXh0IdsWP7FFisJ5Pa6u8WPATZzmcK5b+j/Q/TWiEsNRovDvglWLkG';

You can see that the {{ CLUSTER_API_KEY }} is replaced with OHINNNKVF65HY4RV and {{ CLUSTER_API_SECRET }} is replaced with gxgpnaDi/JQXh0IdsWP7FFisJ5Pa6u8WPATZzmcK5b+j/Q/TWiEsNRovDvglWLkG.

  • Also you can see the list of API keys from the right sub menu named as API keys under the same menu Data Integration. A snapshot is provided below :

Spring Boot Application (Producer and Consumer)

I have developed an application where I have included both the Producer and Consumer. The Producer will publish the message in the Confluent Kafka Topic and the Consumer will consume from that same Topic. It is a Spring Boot Application named as spring-apache-kafka-confluent-masterclass.

  • Project Components – The project that I develop in support to the spring-confluent is a Maven project. And I have used 3 confluent dependencies named as spring-cloud-stream, spring-cloud-stream-binder-kafka and spring-cloud-stream-binder-kafka-streams. And 1 Apache Kafka dependency named as spring-kafka.
<!-- Confluent Kafka Streams Dependencies -->
<dependency>
        <groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
        <version>3.2.3</version>
</dependency>
<dependency>
        <groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>3.2.3</version>
</dependency>
<dependency>
        <groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        <version>2.0.0.RELEASE</version>
</dependency>
<!-- Spring Kafka Dependency -->
<dependency>
        <groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>
  • Structure of the Project – To use the Spring Confluent, Spring Kafka libraries in your project you need to structure it in a right way.
  • Testing the application – In this application there is no UI component and no integration of Database. One can simply integrate the code with UI and Database. No restrictions on that. While here I have used Postman to test the REST endpoint as defined in the Controller class.
  1. Install WSL for Windows. Steps required to download and install are briefly described in the README.md file of the spring-apache-kafka-confluent-masterclass project, in the section Install WSL For Windows.
  2. After that install Confluent, briefly described in the section Install Confluent on WSL – Ubuntu 20.04.
  3. Once the Confluent is up and running in WSL for windows clone the application
  4. Clone the application spring-apache-kafka-consumer-masterclass from GitHub and set up the application locally in any one of the IDEs like Spring Tool Suite (STS) or Eclipse.
  5. Run mvn clean install command, it will create the spring-apache-kafka-congluent-masterclass.jar file in the target directory.
  6. Copy and paste the Jar manually to any of the mount directory. You can check which directory is mounted with your local by running the command as ls -lra. A snapshot is provided below.

You can see that in my case it mounts on 2 local directories as .aws and .azure

  • Running the Application – Just run the java -jar spring-apache-kafka-confluent-masterclass.jar command
    • Open Postman
    • Copy the URL as localhost:7115/confluent/produceJson
    • Change the HTTP Method as POST
    • Copy and paste the below Sample JSON Data in the Body section
    • Hit the Send button
    • The application will generate the logs as provided below:
  • Sample JSON Data – I have provided the Sample Data below

POST Request – JSON Data

{
firstName” : “First Name”,
middleName” : “”,
lastName” : “Last Name”,
mobileNo“1234567890”,
email” : “[email protected]”,
panNo” : “ABCDE1234F”
}

  • Testing in Confluent UI – The data that got published in the Kafka Confluent Topic can also be viewed from Confluent UI. A snapshot is provided below:

GitHub Code Link

Download the Source Code from GitHub

Common Faced Problems

Spring Apache Kafka Confluent Problems

Interview FAQs

Spring Apache Kafka Confluent Interview Questions

Other Useful Links

Spring Data using RDBMS (MySQL DB) and Spring REST

Spring Data using NoSQL DB and Spring REST

Spring Data using Cypher Neo4j DB and Spring REST

Spring REST

Spring RabbitMQ

Spring Apache Kafka – Producer & Consumer

Spring Kafka Confluent – Set Up

Spring Apache Kafka Connect

Spring Mongo Kafka Connector

Spring Cloud using Google Cloud SQL & Google Cloud Storage

Spring Cloud using Google Cloud Pub-Sub

7 Comments

Comments are closed