博主
258
258
258
258
专辑

第十三节 abbitmq之ConfirmCallback与ReturnCallback使用

亮子 2023-12-13 08:04:38 11116 0 0 0

前言

在工作中使用Rabbitmq传输数据时,可能会因为数据、网络等问题,导致数据发送或者接收失败;

如果对此类问题没有做好处理,就会存在丢失数据的问题,为此,引入了ConfirmCallback与ReturnCallback,来保证系统能够做到更好的数据监听、以及消费失败的数据做好相应的补偿;

ConfirmCallback与ReturnCallback也被称为Rabbitmq的消息确认机制;

有哪些问题

首先,下面为消息从生产者 ——> 消费者的流程图:

图片alt

不过如果应用到生产环境中会出现两个问题:

生产者发出的消息可能因为种种原因,并没有发送到交换器,而生产者却不知道;
交换器接收到的消息,并没有发送到队列中,而生产者却不知道;

如何解决

为了解决以上两个问题,系统引入了ConfirmCallback与ReturnCallback:

  • ConfirmCallback为发送Exchange(交换器)时回调,成功或者失败都会触发;
  • ReturnCallback为路由不到队列时触发,成功则不触发;

也就是说,前者是为了监听消息是否到达了Exchange,后者是为了监听消息是否到达了队列,如果这两个步骤遇到了问题,则生产者也好做出相应处理(例如:消息补偿,不过这并不是本篇的重点);

如果消息在消费端消费失败了怎么办?

失败就失败了,在实际场景中,数据库是需要为发送成功的消息做标记的,如果消息没有做标记(消费失败),则会采用定时任务重新发送,不过会涉及到幂等性的问题,这里会另起一篇文章:基于RabbitMQ实现最终一致性解决方案,在此不再赘述;

1、注入回调

    @PostConstruct
    public void init() {
        //消息未送达队列触发回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("消息发送失败,未送达队列,message:{},replyCode:{},replyText:{},exchange:{},exchange:{}", JSON.toJSONString(message), replyCode, replyText, exchange, routingKey);
            MqMsg msg = JSON.parseObject(new String(message.getBody()), MqMsg.class);
            // 更新数据库 设置消息的状态为发送失败
            
        });
        
        //消息进入到Exchange触发回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String id = Objects.requireNonNull(correlationData.getId());
            if (!ack) {
                log.error("消息未发送成功,返回信息:{}", cause);
                //设置消息的状态为发送失败
            } else {
                // 更新数据库 设置消息的状态为发送成功
                
            }
        });
    }

2、生产者

@ResponseBody
@GetMapping("/send")
public String send() {
    UserVo userVo = new UserVo();
    //组装消息内容
    MessageProperties properties = new MessageProperties();
    
    //消息唯一ID,用力防止幂等性
    properties.setMessageId(userVo.getId().toString());
    Message message = new Message(JSON.toJSONString(userVo).getBytes(StandardCharsets.UTF_8), properties);
    
    // 发送消息时,需要根据业务设置唯一id,发送方确认时,还需要使用唯一id去修改数据状态
    rabbitTemplate.convertAndSend("demoExchange", "demoRoutingKey", message);
    return "发送成功";
}

3、消费者

@Slf4j
@Component
@RabbitListener(queues = "demo_data_queue")
public class HelloReceiver {

    int status = 0;

    @RabbitHandler
    public void process(JSONObject jsonObject, Channel channel, Message message) throws Exception {
        // 单条消息的大小限制,一般设为0或不设置,不限制大小
        int prefecthSize = 0;
        // 不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack 注意在自动应答下不生效
        int prefecthCount = 1;
        // 表示是否应用于channel上,即是channel级别还是consumer级别
        boolean global = false;

        channel.basicQos(prefecthSize,prefecthCount,global);

        log.info("收到消息:{}", jsonObject);
        Thread.sleep(10000);
        try {
            log.info("message:{}", message.getMessageProperties().getDeliveryTag());
        } catch (Exception e) {
            status = 1;
            e.printStackTrace();
            log.info("message:{}", message.getMessageProperties().getDeliveryTag());
        } finally {
            // 在这里执行成功或失败
            if (status == 0) {
                //成功消费消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (status == 1) {
                //丢弃这条消息,如果最后一个参数设置为true的话,消息将重回队列末尾,重复消费
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}

参考文章