博主
258
258
258
258
专辑

第九节 RabbitMQ消费者接收消息手动ACK

亮子 2021-08-04 11:51:34 7685 0 0 0

1、消息接收确认机制

import org.springframework.amqp.core.AcknowledgeMode.MANUAL;

消息接收的确认机制主要有三种模式:

  • 自动确认AcknowledgeMode.NONE

RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  • 根据情况确认AcknowledgeMode.AUTO

这也是SpringBoot集成RabbitMQ默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。

  • 手动确认AcknowledgeMode.MANUAL

这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basicAck/basicNack/basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。

2、配置消息接收手动确认

设置接收消息手动确认,可以使用一下两种方式:

  • 通过配置文件来配置

server.port=9000
spring.application.name=demo-rabbit-send

# rabbit mq 连接设置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

## 设置接收手动确认模式
spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.direct.default-requeue-rejected=false
  • 通过接收的注解来配置

package com.shenmazong.demorabbitsend.listen;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

import static com.shenmazong.demorabbitsend.config.RabbitConfig.WORK_QUEUE;

/**
 * @author 军哥
 * @version 1.0
 * @description: 从队列接收消息
 * @date 2021/8/6 11:50
 */

@Component
@Slf4j
public class WorkListener {

    @RabbitListener(queues = WORK_QUEUE, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("onMessage=" + msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

关键的注解是 @RabbitListener(queues = WORK_QUEUE, ackMode = "MANUAL")

void basicAck(long deliveryTag, boolean multiple) throws IOException;

channel.basicAck()函数解释:

  • deliveryTag:该消息的index
  • multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

3、交换机和队列的定义

package com.shenmazong.demorabbitsend.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 军哥
 * @version 1.0
 * @description: RabbitMQ 配置类
 * @date 2021/8/6 11:44
 */

@Configuration
public class RabbitConfig {

    public static final String WORK_QUEUE = "workQueue";
    public static final String WORK_EXCHANGE = "worExchange";

    @Bean
    public Queue workQueue() {
        return new Queue(WORK_QUEUE);
    }

    @Bean
    public DirectExchange workExchange() {
        return new DirectExchange(WORK_EXCHANGE);
    }

    @Bean
    public Binding bindWorkExchange() {
        return BindingBuilder.bind(workQueue()).to(workExchange()).with("red");
    }
}

4、接收手动确认

接收消息的手动确认,主要是通过一下一行代码来实现:

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

如果没有这一行,则消息会一直在队列中。

5、发送消息接口

package com.shenmazong.demorabbitsend.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import static com.shenmazong.demorabbitsend.config.RabbitConfig.WORK_EXCHANGE;

/**
 * @author 军哥
 * @version 1.0
 * @description: 控制层接口类
 * @date 2021/8/6 11:54
 */

@RestController
@Slf4j
public class IndexController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * @description 发送消息到交换机
     * @author 军哥
     * @date 2021/8/7 17:34
     * @version 1.0
     */
    @PostMapping(value = "/send")
    public Object send(@RequestParam("msg") String msg) {
        log.info("send:msg="+msg);

        rabbitTemplate.convertAndSend(WORK_EXCHANGE, "red", msg);

        return msg;
    }
}

参考文档