Spring Boot RabbitMQ Headers Exchange Example

In this tutorial, we’ll go through the step by step guide to implement messaging using RabbitMQ in a Spring Boot Application and will see how to publish and consume messages to Header Exchange and queues using RabbitMQ.

Headers Exchange Example

When the producer publishes messages to RabbitMQ server , they are not published directly to a queue, instead first they goes to exchange. An exchange is in charge of routing messages to different queues using bindings and routing keys. A binding connects a queue and an exchange.

There are 5 types of exchanges

1.Direct Exchange

2.Topic Exchange

3.Fanout Exchange

4.Headers Exchange

5.Default Exchange

In this blog post I will show you how to use Header Exchange in Spring Boot applications.

This is the fourth post in this series.

First part covers about Direct Exchange

Second part covers about Topic Exchange

Third part covers about Fanout Exchange

Headers Exchange

A Headers exchange routes messages based on attributes containing in headers and optional values. Headers exchanges are very similar to topic exchanges, but routes messages based on header values instead of routing keys. A message matches if the value of the header equals the value specified upon binding.

A special argument named “x-match” which specifies criteria matching, added in the binding between exchange and queue, specifies

  • if all headers must match or just one.
  • Either any common header between the message and the binding count as a match
  • or all the headers referenced in the binding need to be present in the message for it to match.

The “x-match” property can have two different values:

“any” or “all”

all – is the default value. A value of “all” means all header pairs (key, value) must match

any – means at least one of the header pairs must match.

Headers can contain wider range of data types, integer string or boolean. The headers exchange type (used with the binding argument “any”) is useful for directing messages which contain a subset of known (unordered) criteria.

Setting Up Project

Go to Spring Initializr and add the following starter dependencies to a project.

  • Spring Web
  • Spring for RabbitMQ

Change the Name to “springboot-rabbitmq-producer” and then click on “Generate” button. A .zip will download. Unzip it. Inside you’ll find a simple, Maven-based project including a pom.xml build file (NOTE: You can use Gradle. The examples in this tutorial will be Maven-based.)

Import the project to your favorite IDE.

Configuring RabbitMQ connection

To integrate RabbitMQ in your Spring-powered web applications, all you need to do is configure Spring to use RabbitMQ. Spring for RabbitMQ provides a convenient class called RabbitTemplate to send and receive messages

By default RabbitTemplate class uses following configuration to connect to RabbitMQ instance.

host: localhost

port: 5672

username: guest

password: guest

Using application.properties

You can also configure connection to RabbitMQ instance using application.properties.

In real world application you need configure using application properties only.

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guestCode language: plaintext (plaintext)

Let’s see how to setup the Headers exchange ,queues and bindings programmatically in Spring Boot application.

We create a configuration class to create exchange ,queues and bindings

package dev.fullstackcode.sb.rabbitmq.producer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfiguration {

    @Bean
    Queue queueA() {
        return new Queue("queue.A", false);
    }

    @Bean
    Queue queueB() {
        return new Queue("queue.B", false);
    }

    @Bean
    Queue queueC() {
        return new Queue("queue.C", false);
    }


    @Bean
    HeadersExchange exchange() {
        return new HeadersExchange("exchange.reports");
    }

    @Bean
    Binding bindingA(Queue queueA, HeadersExchange exchange) {
        Map<String,Object> map = new HashMap<>();
        map.put("type","report");
        map.put("format","pdf");
        return BindingBuilder.bind(queueA).to(exchange).where("type").matches("report");
    }

    @Bean
    Binding bindingB(Queue queueB, HeadersExchange exchange) {
        Map<String,Object> map = new HashMap<>();
        map.put("type","report");
        map.put("format","excel");
        return BindingBuilder.bind(queueB).to(exchange).whereAny(map).match();
    }

    @Bean
    Binding bindingC(Queue queueC, HeadersExchange exchange) {
        Map<String,Object> map = new HashMap<>();
        map.put("type","report");
        map.put("format","excel");
        map.put("period","monthly");
        return BindingBuilder.bind(queueC).to(exchange).whereAll(map).match();
    }
    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    ApplicationRunner runner(ConnectionFactory cf) {
        return args -> cf.createConnection().close();
    }
   
}
Code language: Java (java)

Queue – beans are used to create queue

HeadersExchange – is used to create Header exchange

Binding – is used to create bindings between exchange and Queue.

MessageConverter – is used to convert object to Json format

RabbitTemplate – is used to configure the RabbitTemplate.

In above config class , you can observe that

Queue A – > one header key should match the condition

Queue B -> any one of the headers should match the condition (whereAny(map).match())

Queue C -> all of the headers should match the condition (whereAll(map).match())

Note

By default RabbitTemplate uses SimpleMessageConverter class. SimpleMessageConverter class can be used to send data in string and byte format. You can also use SimpleMessageConverter to send json data but using Jackson2JsonMessageConverter class simplifies the work

Creating Exchange and Queues on Startup

Spring Boot application connects to RabbitMQ server instance and creates Exchange and Queues when first message is published . If you want your application to create Exchange and Queues on application startup , you should use following method.

 @Bean
    ApplicationRunner runner(ConnectionFactory cf) {
        return args -> cf.createConnection().close();
    }Code language: Java (java)

Publishing Messages

Once RabbitTemplate is configured , it is very easy publish messages. We can use overloaded send/convertAndSend method to publish messages.

Let’s develop a controller class which publishes messages.

package dev.fullstackcode.sb.rabbitmq.producer.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.fullstackcode.sb.rabbitmq.producer.model.Event;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.JsonbMessageConverter;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping(value ="rabbitmq/event")
public class RabbitMQProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private HeadersExchange headersExchange;


    @Autowired
    private Jackson2JsonMessageConverter jsonMessageConverter;


    @PostMapping
    public String  send(@RequestBody Event event) {


        if( event.getName().equalsIgnoreCase("Event A")) {

            MessageProperties properties = new MessageProperties();
            properties.setHeader("type","report");
            properties.setHeader("format","pdf");


            Message message = jsonMessageConverter.toMessage(event, properties);
            rabbitTemplate.convertAndSend(headersExchange.getName(), "", message);
        } else if (event.getName().equalsIgnoreCase("Event B")) {

            MessageProperties properties = new MessageProperties();
            properties.setHeader("type","report");
            properties.setHeader("format","excel");

            Message message = jsonMessageConverter.toMessage(event, properties);
            rabbitTemplate.convertAndSend(headersExchange.getName(), "", message);
        } else if (event.getName().equalsIgnoreCase("Event C")) {
            Map<String,Object> map = new HashMap<>();
            map.put("type","report");
            map.put("format","excel");
            map.put("period","monthly");

            MessageProperties properties = new MessageProperties();
            properties.setHeader("type","report");
            properties.setHeader("format","excel");
            properties.setHeader("period","monthly");

            Message message = jsonMessageConverter.toMessage(event, properties);
            rabbitTemplate.convertAndSend(headersExchange.getName(), "", message);
        } else {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST,"unknown event");
        }
        return "message sent successfully";
    }

}
Code language: Java (java)

Starting the RabbitMQ

Let’s use following docker compose file for starting RabbitMQ server in local development for our testing.

version: '3'
services:
  rabbitmq:
    container_name: rabbitmq
    hostname: my-rabbitmq
    image: rabbitmq:3.10.6-management-alpine
    ports:
      - '5672:5672'
      - '15672:15672'
Code language: Java (java)
docker-compose upCode language: plaintext (plaintext)

Starting Spring Boot RabbitMQ producer Application

Let’s start the springboot-raabitmq-producer application and publish some messages.

As we are connecting to RabbitMQ server on startup, it should create Headers Exchange and Queues and bindings in RabbitMQ instance.

We can check this by going to management console of RabbitMQ. It is available at

http://localhost:15672

username/password -> guest/guest

Below images confirm that exchange, queue and bindings created successfully on startup.

Publishing Messages

Let’s publish messages by sending request to controller class.

Let’s look at the management console.

QueueA -> has 3 messages as it matches the header key,value “type” : “report” of all messages

QueueB -> has 3 messages as it matches one of the header key,value “type” : “report” of all messages

QueueC -> has 1 messages as it matches the all header key,values of only one message

Consuming Messages

Now let’s develop a consumer application which consumes messages from RabbitMQ server.

First define MessageConverter to Convert Json String back to Object

package com.fullstackcode.sb.rabbitmq.consumer.config;


import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    @Bean
    MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }



}
Code language: Java (java)

@RabbitListener annotation is used to receive messages from RabbitMQ queues.

Spring Boot will take care of converting message to corresponding Object

package com.fullstackcode.sb.rabbitmq.consumer.listener;

import com.fullstackcode.sb.rabbitmq.consumer.model.Event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class RabbitMQConsummer {


    @RabbitListener(queues = "queue.A")
    private void receiveQueueA(Event event) {
       log.info("Event received from queue A -> {}",event.toString());

    }


    @RabbitListener(queues = "queue.B")
    private void receiveQueueB(Event event) {
        log.info("Event received from queue B -> {}",event);
    }

    @RabbitListener(queues = "queue.C")
    private void receiveQueueC(Event event) {
        log.info("Event received from queue C -> {}",event);
    }
}

Code language: Java (java)

Once we start the consumer application, we can see following output.

INFO  : Event received from queue A -> Event(id=1, name=Event A)
INFO  : Event received from queue A -> Event(id=2, name=Event B)
INFO  : Event received from queue A -> Event(id=2, name=Event C)
INFO  : Event received from queue C -> Event(id=3, name=Event C)
INFO  : Event received from queue B -> Event(id=1, name=Event A)
INFO  : Event received from queue B -> Event(id=2, name=Event B)
INFO  : Event received from queue B -> Event(id=3, name=Event C)
Code language: Java (java)

Connecting to Remote Host

The above consumer example connects to RabbitMQ instance running in local

If you want to connect to instance running on server, you need specify the server address in application.properties

You can configureaddress like below

For SSL connection

spring.rabbitmq.addresses=amqps://<username>:<password>@<host>/<virtual-host>Code language: Java (java)

For non-SSL connection

spring.rabbitmq.addresses=amqp://<username>:<password>@<host>/<virtual-host>Code language: Java (java)

If you prefer individual properties, you can also configure like below

spring.rabbitmq.host=<host>
spring.rabbitmq.virtual-host=<virtaul-host>
spring.rabbitmq.port=5671 (SSL) or 5672 ( non - SSL)
spring.rabbitmq.username=<username>
spring.rabbitmq.password=<password>Code language: Java (java)

Note

RabbitMQ in general listens on port 5672. If RabbitMQ instance is running on SSL, it will listen on port 5671

You can download the source code from GitHub for this blog

producer – sureshgadupu/sb-rabbitmq-headersexchange (github.com)

consumer – sureshgadupu/sb-rabbitmq-headersexchange-consumer (github.com)

You might be also interested in

Similar Posts