Spring RabbitMQ

What is Messaging?

Messaging is a communication mechanism technique to connect between various applications. As for example – when we talk over mobile, or sending money to a beneficiary, etc. In short we can say that, a client can send messages to, and receive messages from, any other client.

Message – A message consists of 2 parts: one is Header and another is Body.

There are 2 types of process by which one can send message and receive message.

  1. Synchronous Messaging – In this process the client sends a message, waits for the response and based on that response, execute another task. It should be remember that in this process, client will not be able to execute another task unless and until the response is received.
  2. Asynchronous Messaging – In this process the client sends a message and doesn’t wait for the response immediately. It means that the client can execute next task without waiting for the response.

In this blog, I have used AMQP (Advance Messaging Queuing Protocol) to communicate with a AMQP Broker named as RabbitMQ.

Let’s discuss in brief regarding AMQP.

What is AMQP?

AMQP is a protocol and its full form is Advance Messaging Queuing Protocol. It is an open standard specification for asynchronous messaging communication. It provides a description to construct a message.

Difference with JMS

AMQP is platform-neutral binaries can be written in different programming languages and can run on different environments.

JMS is standard Java API for communicating with MOM (Message Oriented Middleware). It is a part of Java J2EE. Different Java applications can communicate with each other if those applications uses JMS Clients.

AMQP Entities

AMQP Entities are Exchanges, Queues and Bindings.

  • Exchanges – They are acting like a post offices where the clients publish a message to an AMQP Exchange. There are four types of exchanges:
    • Direct Exchange – Routes messages to the Queue by matching a complete routing key
    • Fanout Exchange – Routes messages to all the queues bound to it
    • Topic Exchange – Routes messages to multiple queues by matching a routing key to a pattern
    • Headers Exchange – Routes messages based on message headers
  • Queues – They are bound to an exchange using a routing key
  • Messages – Messages are the information or data sent to an exchange with a routing key.

For more details click on this links – AMQP Concepts and Routing Mechanism.

Spring AMQP

Spring AMQP consists of 2 modules as spring-amqp and spring-rabbit.

  • AMQP Entities
  • Connection Management
  • Message Publishing
  • Message Consuming

Installation of RabbitMQ

RabbitMQ installation – To install RabbitMQ first you need to install Erlang in your system.

Once Erlang is successfully installed, install RabbitMQ.

To test and verify whether RabbitMQ is successfully installed in local system, just type the URL as http://localhost:15672. Once you type this URL it will opt to enter username and password.

Default username and password for accessing RabbitMQ Home page is guest. All are in lowercase letters. Please find below the screenshot.

Once you successfully login you will redirect to the home page of RabbitMQ. Please find below the screenshot.

In this blog I have provided both the publisher and consumer example.

Details of Application

  1. spring-rabbitmq-producer-masterclass – This application is the producer using Spring Boot and Spring AMQP. It will publish 5 messages in the RabbitMQ running in port no 15672 of localhost. The 5 messages are pushed using the Spring Boot main application class.
  2. spring-rabbitmq-consumer-masterclass – This application is the consumer which will read those 5 messages from the queue and display in the console using Logger. I have not use any DB to save those 5 messages. If anybody wants to implement the DB concept then go for that. I have blogs written in Spring Data using various databases which will help for the implementation.

Let’s first go through the RabbitMQ Publisher application, spring-rabbitmq-producer-masterclass.

Using Spring AMQP with Spring Boot for the RabbitMQ Publisher Application

Whenever you will start working with RabbitMQ, you need a RabbitMQ Broker.

You verify the successful installation of RabbitMQ in your system by hitting the http://localhost:15672 URL in your browser. It will open the login page where you need to provide the credentials and enter into the home page of RabbitMQ. At this point you can say that a perfect running RabbitMQ Broker is running in your local system.

Now we can proceed to publish message in the RabbitMQ and for that you need an application to do that. I have developed a Spring Boot application using Java as the primary language. You can use any other languages as AMQP is platform neutral.

I have created a Demo project named as spring-rabbitmq-producer-masterclass uploaded in my personal GitHub account. One can clone the project and can test locally in their system. All the steps that are required to download the project from GitHub and running it locally are mentioned in the README.md file of spring-rabbitmq-producer-masterclass repository.

  • Brief Description – I have used Spring Boot, Maven, RabbitMQ, Java in this spring-rabbitmq-producer-masterclass project. The main purpose of this project is to focus on implementing RabbitMQ using Spring Boot to publish a message in RabbitMQ Broker. To make it simple and understandable for the viewers, I have used the SpringBoot Main application class to publish messages in the RabbitMQ Broker.
  • Software Used – Software required to develop this project.
    • Spring Tool Suite-4.7.0-RELEASE – If latest version is available then download that one
    • Apache Maven 3.6.3 – If latest version is available then download that one
    • Java 8 – Not less than Java8
    • Git 2.27.0 – Latest version as available
    • Erlang (SMPASYNC_THREADS) (BEAM) emulator version 12.0 – Latest version as available
    • RabbitMQ – 3.9.8 – Latest version as available

Steps required to install the software Erlang and RabbitMQ are clearly provided in the README.md file of the spring-rabbitmq-producer-masterclass repository.

  • Project Components – The project that I develop in support to the Spring-AMQP concept is a Maven project. Here I have used one dependency named as spring-boot-starter-amqp dependency. This dependency provides the support of Spring-AMQP libraries.
<!-- Spring AMQP Dependency -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • Structure of the Project – To support and use Spring AMQP and its libraries you need to structure the project in a right way as provided below:

I have used Spring Boot in this application. So I have a starting class named as SpringRabbitmqProducerMasterclassApplication.java annotated with @SpringBootApplication.

In the above screenshot you can see there is a class named as RabbitMQConfig.java of config package. The configurations of RabbitMQ are defined in this class.

This class is annotated with @Configuration annotation. This class is used to configure the Queue, Exchange and Binding. And an AMQPTemplate is used to publish the messages in the Queue. Let’s dig into details of this configuration class.

@Configuration
public class RabbitMQConfig {
        @Value("${rabbit.mq.queue.name}")
        private String queueName;
        @Value("${rabbit.mq.exchange}")
        private String exchange;
        @Value("${rabbit.mq.routing.key}")
        private String routingKey;

        @Bean
        Queue queue() {
              return new Queue(queueName, true);
        }
        @Bean
        DirectExchange exchange() {
              return new DirectExchange(exchange);
        }
        @Bean
        Binding binding(Queue queue, DirectExchange exchange) {
              return BindingBuilder.bind(queue).to(exchange).with(routingKey);
        }
        @Bean
        public MessageConverter jsonMessageConverter() {
              return new Jackson2JsonMessageConverter();
        }
        public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
              final RabbitTemplate rabbitTemplate = new  RabbitTemplate(connectionFactory);
              rabbitTemplate.setMessageConverter(jsonMessageConverter());
              return rabbitTemplate;
        }
}

In the above configuration class, I have defined 3 Beans as queue, exchange and binding.

Queue – Initialize the Queue by calling the Queue constructor and passing the queueName as the 1st argument. Queue name is springcavaj. And the 2nd argument is whether the Queue is Durable or not. If you pass true then the Queue is durable means that on server restart the queue definition will survive but not the messages in it. And if it is false, then the queue definition will not survive in server restart. If you make any Queue as durable then in RabbitMQ Management UI, you will be able to see that Queue is marked with the letter “D” to make ensure that the queue is durable. If you want to explore more in Queue, please go through this link ‘Queue Biography‘.

Exchange – Exchanges are the message routing agents defined per virtual host within RabbitMQ. An exchange routes the messages to different queues. It accepts the messages from producer/publisher application and routes them to message queues with the help of headers, attributes, bindings and routing keys. Here I have used a Direct Exchange. The routing algorithm of direct exchange is that messages will goes to the queue whose binding key exactly matches with the routing key of the message. Exchanges are of different types. If you want to explore more in exchange, please go through this link ‘Different Types of Exchanges and its functionalities‘.

The queue springcavaj that I have used is durable. Please find below the screenshot.

Binding – In RabbitMQ, binding is a connection which is used to configure a relation between a Queue and an Exchange. In simple words we can say that binding is a relationship between Queue and Exchange. If you want to explore more in Bindings, please go through this link ‘Bindings Definition and Functionalities‘.

Now let’s go through the RabbitMQPublisher.java class which will helps to produce/publish the desired message in RabbitMQ using exchange and routing key.

@Service
public class RabbitMQPublisher {
       @Autowired
       private AmqpTemplate amqpTemplate;
       public void publish(Car car) {
    	   amqpTemplate.convertAndSend(exchange, routingKey, car);
       }
}

Here I have used AmqpTemplate to send the message in Queue from the application by invoking the convertAndSend(exchange, routingKey, car) method. The parameters used to pass are as exchange (springcavaj.direct), routingKey (springcavaj.pub) and the object/model car that will be published in RabbitMQ. I have used a model named Car.java as the message to publish in RabbitMQ (springcavaj). Please find below the screenshot from Rabbit Management UI for the information regarding Exchange and Routing Key of the springcavaj queue.

5 records have been published in the RabbitMQ (springcavaj) from the Publisher application. Please refer to the below screenshot.

Now let’s focus on the Consumer application which will consume the messages from the above said RabbitMQ (springcavaj).

Using Spring AMQP with Spring Boot for the RabbitMQ Consumer Application

I have created a Demo project named as spring-rabbitmq-consumer-masterclass uploaded in my personal GitHub account. One can clone the project and can test locally in their system. All the steps that are required to download the project from GitHub and running it locally are mentioned in the README.md file of spring-rabbitmq-consumer-masterclass repository.

  • Brief Description – I have used Spring Boot, Maven, RabbitMQ, Java in this spring-rabbitmq-consumer-masterclass project. The main purpose of this project is to consume messages from RabbitMQ (springcavaj) using Spring Boot. After consuming the messages I have just print those messages in Console using org.slf4j.Logger. While one can persist those messages permanently in any of the database. I have blogs regarding Spring Data, one can go through those blogs and perform necessary actions.
  • Software Used – Software required to develop this project. You need the same lists of software what I have used in the spring-rabbitmq-producer-masterclass application.

Although the steps required to install the software are clearly provided in the README.md file of the spring-rabbitmq-consumer-masterclass repository.

  • Project Components – The project that I develop in support to the Spring-AMQP concept is a Maven project. Here I have used one dependency named as spring-boot-starter-amqp dependency. This dependency provides the support of Spring-AMQP libraries.
<!-- Spring AMQP Dependency -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • Structure of the Project – To support and use Spring AMQP and its libraries you need to structure the project in a right way as provided below:

I have used Spring Boot. There is a class annotated as @SpringBootApplication and has the main() method. The name of the class is SpringRabbitmqConsumerMasterclassApplication.java and it is the starting point of the application.

In the Consumer application, I have defined the same RabbitMQ Configurations in RabbitMQConfig.java class as I have defined in the Producer application.

This is a Consumer application having a class named as RabbitMQConsumer.java annotated with @RabbitListener(queues = “${rabbit.mq.queue.name}”). Once this class is annotated with @RabbitListener annotation one need to override a method named receiveMessage() annotated with @RabbitHandler annotation.

Please have a look of the snippet of the RabbitMQConsumer.java.

@RabbitListener(queues = "${rabbit.mq.queue.name}")
public class RabbitMQConsumer {
       @RabbitHandler
       public void receiveMessage(@NonNull Car car, Channel channel, Message message) throws IOException {
              try {
			if(null != car) {
				LOGGER.info("Message payload : {}", car.toString());
                        }
                  } catch(Exception e) {
    	        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    	          } finally {
                 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                  }
         }
}

Here I have just log the object car in the console. Once the consumer consume the messages from the RabbitMQ those messages will no longer available in the Queue. So it’s one of the responsibility of the consumer application that to send an acknowledgement back to the Queue. If the consumer successfully reads the messages from the queue then it will send a positive acknowledgement and if there occurs any error or exception while consuming the messages from the Queue then it will send a negative acknowledgement to the Queue.

For positive acknowledgement I have used the basicAck() method of the Channel class. In basicAck() method one need to pass 2 arguments, 1st argument is the deliveryTag and the 2nd argument is boolean. 1st argument is to get the deliveryTag from the Message class as message.getMessage.getDeliveryTag(). 2nd argument signifies that if the value is false then it will only acknowledge the deliveryTag and if it is true then it acknowledges all messages up to and the deliveryTag.

For negative acknowledgement I have used basicNack() method of same Channel class. This method needs 3 arguments. The first 2 arguments are same as that of basicAck() method. The last argument is again a boolean which signifies that whether the message will be re-queued or not. If it is false then it will not be re-queued but if is true then it will be.

What is the significance of Acknowledging a message?

If anyone doesn’t acknowledge a message after reading from the Queue, then those messages will be in Unacked section of the Queue. Please find below the screenshot.

As I have acknowledged the message after consuming from the Queue, so there are no counts of messages in Unacked section (highlighted in Yellow color). If you don’t send acknowledgement to the broker, then in RabbitMQ Management Console one will able to see the count of consumed messages in the Unacked section. It means that the messages are consumed by the consumer but those messages are not acknowledged.

GitHub Code Link

Download the Publisher Source Code from GitHub

Download the Consumer Source Code from GitHub

Common Faced Problems

Spring RabbitMQ Problems

Interview FAQs

Spring RabbitMQ Interview FAQs

Other Useful Links

Spring Data using RDBMS (MySQL DB) and Spring REST

Spring Data using NoSQL DB and Spring REST

Spring Data using Cypher Neo4j DB and Spring REST

Spring Data using Redis and Spring REST

Spring REST

Spring Apache Kafka – Producer & Consumer

Spring Kafka Confluent – Set Up

Spring Kafka Confluent – Producer & Consumer

Spring Apache Kafka Connect

Spring Mongo Kafka Connector

Spring Cloud using Google Cloud SQL & Google Cloud Storage

Spring Cloud using Google Cloud Pub-Sub

Spring Cloud using Google Cloud Spanner