Apache Kafka Connector Initial Concept
A brief discussion on the Initial Concept of Apache Kafka Connector is described in this springcavaj – Apache Kafka Connect page with an architecture diagram.
This guide will provide you the understanding of Apache Kafka Connector with Mongo DB. A small Spring Boot application developed to serve the purpose.
Use of Mongo Kafka Connector
Kafka Connector supports the concept of Data-centric pipeline. One can easily push and pull data to Kafka using Connector. It is flexible, means it runs with streaming and batch-oriented systems on a single node (standalone) and scalable means distributed to an organization-wide service. It is also reusable and extensible, means There are in general 2 types of connectors as Source Connector and Sink Connector. Source Connector means it will pull the data from the DB or File System and pushed in a Kafka Topic. Sink Connector means it will push the data to any DB or File System after pulling it from a Kafka Topic. I have created an application in support of the above description using Mongo DB as the underlying Database. This application consists of the Producer which pushes the data in a Kafka Topic and a sink connector is there which will read the data from the Topic and pushed to Mongo DB. And there is a Consumer which will read the data from the Kafka Topic as pushed by the source connector from the Mongo DB to Topic.
Discussion on spring-connect-kafka-mongodb-masterclass application
I have used here the Mongo Kafka Connector using Spring Boot Application and Mongo DB as the underlying database. The demo application is available in my personal GitHub account. The name of the application is spring-connect-kafka-mongodb-masterclass. REST endpoints are there to test the application.
- Brief Description – In this application I have used Java, Spring Boot, Maven, Apache Kafka, Mongo Kafka Connector and Mongo DB. This application consists of 2 parts one is the Producer part and there is the Consumer part. Producer part using a REST Endpoint pushes the data in a topic named as kafka-mongo-sink-topic. From this topic a sink connector is there named as mongo_sink.json which will pushed the data in one of the Mongo DB collection named as kafka_mongo_sink. And 2nd part i.e. the Consumer part it will pull the data from the kafka-mongo-source-topic pushed by the source connector mongo_source.json reading the data from another Mongo DB Collection named as kafka_mongo_source. This Consumer part is not attached with the REST Endpoint it will be activated as soon as the Spring Boot Application starts. A brief description is provided for the 2 files as mentioned above as mongo_sink.json and mongo_source.json.
- Software Used – Software required to develop the application
- Spring Tool Suite-4.7.0-RELEASE – If latest version is available then download that one
- Apache Maven 3.6.3 – If latest version is available then download that one
- Java 8 – Not less than Java8
- Git 2.27.0 – Latest version as available
- Scala – Download the Latest Version. Latest Version – Scala 2.13
- Mongo 4.2.8 – Mongo DB Server
- Robo 3T 1.3.1 – Mongo DB Client to see the data as persisted in DB. You can download any other client as available in the market
- Mongo Kafka Connector – Latest version as available
- Postman v8.3.0 – To test the REST Service
- List of the software and their download link with the installation steps are briefly described in the README.md file of spring-connect-kafka-mongodb-masterclass repository.
- Project Components – The project that I develop in support to the concept of Mongo Kafka Connector is a Maven project. And I have used 3 dependencies as spring-boot-starter-data-mongodb, spring-kafka & mongo-kafka-connect.
<!-- Spring Mongo Dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!-- Spring Kafka Dependency -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Mongo Kafka Connector Dependency -->
<dependency>
<groupId>org.mongodb.kafka</groupId>
<artifactId>mongo-kafka-connect</artifactId>
<version>1.5.1</version>
</dependency>
Now we have use Mongo Kafka Connector where we introduce the concept of Source and Sink Connectors. We have mentioned 2 .json files as mongo_source.json and mongo_sink.json.
mongo_source.json -> Connects with the Mongo DB pull the data from there and push the data in a Kafka Topic. Some properties mentioned in this file are as follows:
{
"name": "mongo-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"topics": "kafka-mongo-source-topic",
"connection.uri": "mongodb://localhost:27017",
"database": "spring",
"collection": "kafka_mongo_source",
"publish.full.document.only": true
}
}
Let’s discuss in brief the use of the properties:
- name – A unique name for the connector
- connector.class – The class name of the Mongo Kafka Connector
- tasks.max – The max no. of tasks that can be created for the connector. This is set to 1 in the above example
- topics – The name of the Kafka topic where the data is published by the use of the Mongo Connector. The name of the topic is kafka-mongo-source-topic
- connection.uri – The URI for connecting to the MongoDB instance. If the connection has username and password then that also need to provide in the connection uri
- database – The name of the source database from where the data will be pulled and published in the above topic. The name of the database as mentioned in the above example is spring
- collection – The name of the source collection from where the data will be pulled. The name of the collection in the above example is kafka_mongo_source
- publish.full.document.only – It will publish the full document as present in the source collection to the kafka topic. It is boolean in nature either true or false. In the above example it is mentioned as true means it will publish the full document
mongo_sink.json -> Pull the data from a Kafka Topic and persist the data in the Mongo DB after proper connection. Some properties mentioned in this file are as follows:
{
"name": "mongo-sink",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "kafka-mongo-sink-topic",
"connection.uri": "mongodb://localhost:27017/spring",
"database": "spring",
"collection": "kafka_mongo_sink",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy",
"document.id.strategy.overwrite.existing": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
Let’s discuss in brief the use of properties:
- name – A unique name for the connector
- connector.class – The class name of the Mongo Kafka Connector
- tasks.max – The maximum number of tasks that can be created for the connector. This is set to 1 in this example
- topics – The name of the Kafka topic from which data should be consumed. Here in this example the name of the topic is kafka-mongo-sink-topic
- connection.uri – The URI for connecting to the MongoDB instance. If the connection has username and password then that also need to provide in the connection uri
- database – The name of the sink database in which the data will be persisted. The name of the database as mentioned in the above example is spring
- collection – The name of the sink collection in which the data will be persisted. The name of the collection in the above example is kafka_mongo_sink
- key.converter – The class name of the converter used to deserialize the Kafka message key
- value.converter – The class name of the converter used to deserialize the Kafka message value
- value.converter.schemas.enable – Whether or not to include schema information in the serialized message. This should be set to false for JSON messages
- schema.ignore – Whether or not to ignore the schema information in the Kafka message. This should be set to true for JSON messages
- document.id.strategy – The strategy for generating the _id field in the MongoDB document. The ProvidedInValueStrategy strategy uses the _id field in the message value as the document ID
- document.id.strategy.overwrite.existing – Whether or not to overwrite the _id field if it already exists in the MongoDB document
- errors.log.enable – Whether or not to log errors in the connector’s error log
- errors.log.include.messages – Whether or not to include error messages in the connector’s error log
These are the list of properties used in the source or sink json files. Some of the properties are common and some are not.
- Structure of the Project – The structure of a project using Mongo Kafka Connector with a Spring Boot application is provided below:
- Testing the application – This application includes both the concept of Source Connector and Sink Connector. To test the application the required software, the initial setup regarding Kafka Connectors and the application are all mentioned in the README.md file of the application.
- Run Zookeeper
- Run Apache Kafka Server
- Create 2 topics named kafka-mongo-source-topic and kafka-mongo-sink-topic
- Install MongoDB
- Run Mongo as a daemon
- Create a database named spring
- Create 2 collections named kafka_mongo_source and kafka_mongo_sink
- Testing the Source Connector – Persist data in Mongo DB collection kafka_mongo_source of database spring, create a topic named as kafka-mongo-source-topic, and after that run the Spring Boot Application. In the logs, you will be able to see that the data from the kafka-mongo-source collection is being published in the topic kafka-mongo-source-topic and the Consumer consumes the data.
- Testing the Sink Connector – Create a collection named kafka_mongo_sink and a topic named as kafka-mongo-sink-topic, and run the application as Spring Boot application. Open Postman, hit the REST API Endpoint
localhost:7116/kafkaMongoJson
it will publish the data in the kafka-mongo-sink-topic and the connector will consume the data from that topic and persist the same data in the collection named kafka_mongo_sink
- Running the application – Run the spring boot application by right clicking on the project and select the option as Run As and then open Postman to hit the REST API Endpoint URL
- In Postman, paste the
localhost:7116/kafkaMongoJson
, change the type of method to POST. - In the body provide a JSON and hit send
- In Postman, paste the
- Sample JSON Data
POST Request JSON Data
{
"firstName" : "First",
"lastName" : "Name",
"mobileNo" : "1234567890"
}
GitHub Code Link
Download the Source Code from GitHub
Common Faced Problems
Spring Mongo Kafka Connect Common Problems
Interview FAQs
Spring Mongo Kafka Connect Interview Questions
Other Useful Links
Spring Data using RDBMS (MySQL DB) and Spring REST
Spring Data using NoSQL DB and Spring REST
Spring Data using Cypher Neo4j DB and Spring REST
Spring Data using Redis and Spring REST
Spring Apache Kafka – Producer & Consumer
Spring Kafka Confluent – Set Up
Spring Kafka Confluent – Producer & Consumer
Spring Cloud using Google Cloud SQL & Google Cloud Storage
Spring Cloud using Google Cloud Pub-Sub
Pingback: Apache Kafka Connect - springcavaj
Pingback: Spring Data – Mongo - springcavaj
Pingback: Spring Data – JPA - springcavaj
Pingback: Apache Kafka – Producer Consumer using Confluent - springcavaj
Pingback: Spring REST - springcavaj
Pingback: Spring RabbitMQ - springcavaj
Pingback: Spring Apache Kafka – Producer and Consumer - springcavaj
Pingback: Apache Kafka Confluent - springcavaj