博主
258
258
258
258
专辑

第七节 RabbitMQ利用死信队列实现延迟队列

亮子 2021-08-02 09:58:30 8060 1 1 0

1、什么是死信队列

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?

死信 是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。

  • 消息在队列的存活时间超过设置的TTL时间。

  • 消息队列的消息数量已经超过最大队列长度。

只要符合上述三个条件之一,那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

2、如何配置死信队列

这一部分将是本文的关键,如何配置死信队列呢?其实很简单,大概可以分为以下步骤:

  • 配置业务队列,绑定到业务交换机上
  • 为业务队列配置死信交换机和路由key
  • 为死信交换机配置死信队列

注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

3、SpringBoot集成死信队列

1)、添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2)、配置文件

# 应用名称
spring.application.name=server-shop-dead

# 应用服务 WEB 访问端口
server.port=8080

# rabbit mq
spring.rabbitmq.host=8.142.83.78
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=my_vhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=shengban@1907a

# 手动确认模式
spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.direct.default-requeue-rejected=false

其中

  • acknowledge-mode

该配置项是用来表示消息确认方式,其有三种配置方式,分别是none、manual和auto。

none意味着没有任何的应答会被发送。

manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。

auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。

  • default-requeue-rejected

该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true。

我一开始对于这个属性有个误解,我以为rejected是表示拒绝,所以将requeue-rejected连起来是拒绝重新放回队列,后来查了资料明白这个属性的功能才想起来rejected是个形容词,其表示的应该是被拒绝的消息

所以如果该属性配置为true表示会重新放回队列,如果配置为false表示不会放回队列。

3)、配置类

package com.shenmazong.dead.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 军哥
 * @version 1.0
 * @description: 死信队列配置类
 * @date 2022/2/9 18:50
 */

@Configuration
public class RabbitMqConfig {
    // 死信队列
    @Bean
    public Queue deadQueue() {
        return new Queue("dead");
    }

    // 死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("deadExchange");
    }

    // 把死信队列绑定到死信交换机上
    @Bean
    public Binding bindDeadExchange() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("error");
    }

    // 创建业务交换机
    @Bean
    public DirectExchange businessExchange() {
        return new DirectExchange("businessExchange");
    }

    // 创建业务队列
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(2);
        //--1 过期时间20s
        args.put("x-message-ttl", 20000);

        //--2 x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", "deadExchange");

        //--3x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", "error");

        Queue businessQueue = QueueBuilder.durable("businessQueue").withArguments(args).build();
        return businessQueue;
    }

    // 把业务队列绑定到业务交换机上:routingkey是什么,无所谓,因为没有消费者
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue()).to(businessExchange()).with("");
    }
}

4)、死信消息接收

package com.shenmazong.dead.recv;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


/**
 * @author 军哥
 * @version 1.0
 * @description: 死信消息接收
 * @date 2022/2/9 19:09
 */

@Component
@Slf4j
public class RecvMessage {

    @RabbitListener(queues = "dead")
    public void handleMessage(Message message, Channel channel) throws IOException {

        // 获取消息ID
        String messageId = message.getMessageProperties().getMessageId();
        log.warn("messageId:"+messageId);

        // 获取消息内容
        String body = new String(message.getBody());
        log.warn("body:"+body);

        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5)、消息发送

package com.shenmazong.dead.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

/**
 * @author 军哥
 * @version 1.0
 * @description: 发送延迟消息
 * @date 2022/2/9 19:36
 */

@RestController
@Slf4j
public class SendMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping(value = "/send")
    public String send(@RequestParam("msg") String msg) {

        String messageId = UUID.randomUUID().toString();
        log.warn("send messageId:"+messageId);

        Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).setMessageId(messageId).build();

        rabbitTemplate.convertAndSend("businessExchange", "", message);

        return msg;
    }
}

参考文档