RabbitMQ – How to Configure with Spring

RabbitMQ – How to Configure with Spring

With the growth of microservice architecture, the need for synchronization between web apps is increasing every day. Therefore the importance of the usage of Message Queue application is raised a lot too. The most commonly used Messaging apps are RabbitMQ and Apache Kafka.

In this post, we are going to learn how to configure Spring Boot app to consume RabbitMQ messages. We are going to configure the consumer with dead letter settings arranged. If we do not arrange it then the messages failed to deliver will be processed again and again. Dead-letter queues are used to keep the failed messages. Moreover, we are going to apply an IT with the aid of test containers.

Sample SpringBoot App

You can GIT/clone the sample project from the following URL.

https://github.com/tasdemirbahadir/rabbit-tutorial

Pom Configuration

We are going to use the below pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.btasdemir</groupId>
    <artifactId>rabbit-tutorial</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbit-tutorial</name>
    <description>Rabbit Tutorial Project</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>testcontainers</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <version>3.1.0</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

In the first place, In this maven project, we are using Spring Boot Starter Parent 2.0.3.RELEASE, Spring Boot Starter AMQP for RabbitMQ, Apache Commons Lang 3 for StringUtils, Test Containers to prepare RabbitMQ integration test, Awaitility to synchronize integration test, Jackson libraries for our Message Dto.

RabbitMQ Configuration

Below class shows the configurations for Rabbit MQ.

package com.btasdemir.rabbittutorial.configuration;

import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableRabbit
public class MessageQueueConfiguration implements RabbitListenerConfigurer {

    private final MessageQueueConfigurationProperties properties;

    @Autowired
    public MessageQueueConfiguration(MessageQueueConfigurationProperties properties) {
        this.properties = properties;
    }

    @Bean
    public Queue queue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", StringUtils.EMPTY);
        args.put("x-dead-letter-routing-key", properties.getDeadLetterQueueName());
        return new Queue(properties.getQueueName(), true, false, false, args);
    }

    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange(properties.getExchangeName());
    }

    @Bean
    public Binding binding(Queue queue, FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(properties.getDeadLetterQueueName(), true);
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }
}

Accordingly with the configuration, here is the step by step explanation:

  • Bean Queue -> configures our message queue with dead-letter configs.
  • Fanout exchange -> out exchange bound to the queue in use.
  • Binding -> to bind the exchange with the queue.
  • Queue Dead Letter Queue -> config the dead-letter queue.
  • Jackson Message Converter -> to make our system able to convert objects to JSON and vice versa.
  • Configure Rabbit Listeners -> to provide the Jackson converter to the Rabbit listener.

In the result of above, we simply configure a queue and an exchange bound to that queue. The dead letter queue is given as arguments to our queue in use. Likewise, we have configured dead-letter-exchange and kept it empty not to use any other exchanges because we don’t need it currently.

Message Queue Consumer

You can investigate the Message Queue Consumer class below.

package com.btasdemir.rabbittutorial.consumer;

import com.btasdemir.rabbittutorial.model.dto.UserQueueDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageQueueConsumer {

    private final static Logger logger = LoggerFactory.getLogger(MessageQueueConsumer.class);

    @RabbitListener(queues = "${mq.info.queue-name}")
    public void consume(final UserQueueDto userQueueDto) {
        logger.debug("Consumed user data: {}", userQueueDto);
    }

}

It simply listens to the queue and logs the sent data (in our tutorial case, the message contains a user id. You can investigate the details from the stash URL of the sample project.).

Local RabbitMQ for Test

With the power of Docker, it is very easy for us to create a Rabbit MQ instance locally. Please make sure your device has Docker locally first, then run the command below:

docker run -d -p 5671-5672:5671-5672 -p 15671-15672:15671-15672 --name rabbitmq rabbitmq:3-management

As a result of this, in a few seconds, your rabbit will be up and running. The username is “guest” and the password is “guest”.

Running and Seeing Results

Let’s run and send a sample message from the local Rabbit MQ to see if everything is fine.

  1. Run your app
  2. Open http://localhost:15672/ from the browser
  3. Login with username: guest password: guest
  4. See that the exchange with the name “localhost.exchange” and queue with name “localhost.queue” is ready with the dead-letter-queue.
  5. Head to the queue “localhost.queue”
  6. Click “Publish Message” below
  7. For properties type “ContentType: application/JSON”
  8. For message type “{‘UserId’: 1234}”
  9. Send message.
RabbitMQ Publish Message
RabbitMQ Publish Message

In your application console, you must see that the logger has logged the user ID sent with the message. However, if you don’t see it please make sure your logging configurations are correct. You can leave your questions in the comments section at any time.

Screen-Shot-2018-06-25-at-16.49.39
Screen-Shot-2018-06-25-at-16.49.39

Integration Test with Test Container

Description

As an example, below class contains the integration test for Rabbit MQ.

package com.btasdemir.rabbittutorial.consumer;

import com.btasdemir.rabbittutorial.model.dto.UserQueueDto;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.rule.OutputCapture;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.testcontainers.containers.GenericContainer;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.IsEqual.equalTo;

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(initializers = MessageQueueConsumerIT.Initializer.class)
public class MessageQueueConsumerIT {

    @TestConfiguration
    public static class CustomizeConfig {
        @Bean
        public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory,
                                             Jackson2JsonMessageConverter producerJackson2MessageConverter) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(producerJackson2MessageConverter);
            return rabbitTemplate;
        }

        @Bean
        public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }

    @ClassRule
    public static GenericContainer rabbit = new GenericContainer("rabbitmq:3-management")
            .withExposedPorts(5672, 15672);

    @Value("${mq.info.queue-name}")
    private String queueName;

    @Rule
    public OutputCapture outputCapture = new OutputCapture();

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void it_should_handle_sent_message_successfully() {
        //Given
        UserQueueDto message = new UserQueueDto();
        message.setUserId(1L);

        //When
        rabbitTemplate.convertAndSend(queueName, message);

        //Then
        await().atMost(5, TimeUnit.SECONDS).until(isUserConsumedAsync(), is(equalTo(true)));
    }

    private Callable<Boolean> isUserConsumedAsync() {
        return () -> outputCapture.toString().contains("Consumed user");
    }

    public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

        @Override
        public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
            TestPropertyValues values = TestPropertyValues.of(
                    "spring.rabbitmq.host=" + rabbit.getContainerIpAddress(),
                    "spring.rabbitmq.port=" + rabbit.getMappedPort(5672),
                    "spring.rabbitmq.user=" + "guest",
                    "spring.rabbitmq.password=" + "guest",
                    "spring.rabbitmq.virtual-host=" + "/"
            );
            values.applyTo(configurableApplicationContext);
        }

    }
}

We are running the test class with SpringRunner not to run it as a mock.

@ContextConfiguration(initializers = MessageQueueConsumerIT.Initializer.class)

Above line sets the initializer for the test. Therefore, this initializer simply overrides the configurations set inside the “application.yaml” configurations. Moreover, if we don’t set this, the test context won’t be able to connect to the test container because it uses a port different from the configurations.

The “CustomizeConfig” class configures the rabbit template. Why we didn’t configure it inside RabbirMQ configuration class? Because our application’s purpose is to only consume messages, not to send. This configuration class configures the rabbit template only for the test purpose.

@ClassRule
    public static GenericContainer rabbit = new GenericContainer("rabbitmq:3-management")
            .withExposedPorts(5672, 15672);

This is our test container. It provides a Docker container runs A Rabbit MQ inside.

Testing

//When 
rabbitTemplate.convertAndSend(queueName, message);

In the ‘when’ phase of our test, we simply send a message to our queue listener by our consumer. Thus, our purpose is to detect if our consumer is able to successfully process the message.

private Callable<Boolean> isUserConsumedAsync() { 
    return () -> outputCapture.toString().contains("Consumed user"); 
}

Moreover, above method is a helper that provides if the output capture captured the log in need. In the case of the result is true then the message processed successfully (You can check the consuming code to verify the logging).

//Then 
await().atMost(5, TimeUnit.SECONDS).until(isUserConsumedAsync(), is(equalTo(true)));

The code “await” waits until the method “isUserConsumedAsnyc” returned true and timeouts when the rule in need is not ready for the next 5 seconds.

Hence, if everything is correct, you must see that your test completes successfully.

 

That’s all for now. Lastly, please leave your questions in the comments section.

References

Related Post