博主
258
258
258
258
专辑

第二十一节 RabbitMQ-ack消息确认,confirm确认机制

亮子 2024-03-09 10:59:57 10354 0 0 0

1、RabbitMQ的确认机制

在使用 RabbitMQ 中实现 ACK(消息确认)的过程通常包括以下步骤:

  1. 消费者订阅队列: 消费者通过连接到 RabbitMQ 服务器,并订阅需要消费的队列,等待接收消息。

  2. 消费消息: 消费者从队列中获取消息进行处理。

  3. 发送 ACK: 在消息成功被处理后,消费者需要发送 ACK 给 RabbitMQ,以确认消息已经被消费。ACK 通知 RabbitMQ 可以安全地将该消息标记为已经处理。

实现 ACK 的关键是要在消息处理完成后正确发送 ACK 给 RabbitMQ。在 RabbitMQ 中,ACK 是通过 Channel 来发送的。以下是 ACK 的简单实现代码示例:

// 创建连接和信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 订阅队列
channel.queueDeclare("my_queue", true, false, false, null);

// 设置消费者
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 消费消息
        String message = new String(body, "UTF-8");
        System.out.println("Received message: " + message);

        // 处理消息

        // 发送 ACK
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
};

// 开始消费消息
channel.basicConsume("my_queue", false, consumer);

在上述代码中,channel.basicAck(envelope.getDeliveryTag(), false); 表示发送 ACK 给 RabbitMQ,envelope.getDeliveryTag() 获取了当前消息的唯一标识符。第二个参数 false 表示不批量确认,即只确认当前消息。

需要注意的是,ACK 机制在确保消息处理的可靠性方面至关重要。确保在消息被处理后再发送 ACK,避免因为异常或者错误导致消息重复消费或丢失。

2、springboot中rabbitmq如何实现ack功能

在 Spring Boot 中使用 RabbitMQ 实现 ACK(消息确认)功能,可以通过配置 SimpleMessageListenerContainer 或者使用 @RabbitListener 注解来实现。以下是两种方式的示例:

使用 SimpleMessageListenerContainer 配置 ACK

@Configuration
@EnableRabbit
public class RabbitConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动ACK模式
        return factory;
    }
}

然后在消费者方法上应用 @RabbitListener 注解,并在方法中手动发送 ACK:

@Component
public class MyRabbitListener {

    @RabbitListener(queues = "my_queue")
    public void processMessage(Message message, Channel channel) throws IOException {
        // 处理消息
        System.out.println("Received message: " + new String(message.getBody()));

        // 发送 ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

使用 @RabbitListener 注解实现 ACK

直接在消费者方法上使用 @RabbitListener 注解,并配置 ackModeAckMode.MANUAL,然后手动发送 ACK:

@Component
public class MyRabbitListener {

    @RabbitListener(queues = "my_queue", ackMode = "MANUAL")
    public void processMessage(Message message, Channel channel) throws IOException {
        // 处理消息
        System.out.println("Received message: " + new String(message.getBody()));

        // 发送 ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

在以上示例中,我们设置了 ACK 模式为 MANUAL,并手动在消费者方法中发送 ACK。这样就可以在 Spring Boot 中实现 RabbitMQ 的 ACK 功能,确保消息得到正确处理并进行确认。

3、ConfirmCallback和ReturnCallback有啥区别

在 RabbitMQ 中,ConfirmCallbackReturnCallback 是用于实现消息确认和消息返回的回调接口。它们的作用和触发时机有所不同:

  1. ConfirmCallback(消息确认): 当消息被 RabbitMQ 确认接收后,会触发 ConfirmCallback 回调。这个回调可以用来确保消息是否成功发送到 RabbitMQ 服务器,并进行相应的处理。

  2. ReturnCallback(消息返回): 当消息无法被正确路由到队列时,会触发 ReturnCallback 回调。这个回调可以用来处理无法路由的消息,可以根据需要进行重发、持久化等操作。

具体区别如下:

  • ConfirmCallback:
  • 触发时机:当消息被 RabbitMQ 确认接收后触发。
  • 使用场景:用于确认消息是否成功发送到 RabbitMQ 服务器,可以进行相应的处理,如记录日志、更新数据库等。
  • 配置方式:通过 RabbitTemplatesetConfirmCallback() 方法设置。
  • 回调方法:confirm 方法,包含消息的唯一标识符和是否成功的标志。
  • ReturnCallback:

  • 触发时机:当消息无法被正确路由到队列时触发。
  • 使用场景:用于处理无法路由的消息,可以进行重发、持久化等操作。
  • 配置方式:通过 RabbitTemplatesetReturnCallback() 方法设置。
  • 回调方法:returnedMessage 方法,包含消息的相关信息(如消息体、交换器、路由键等)。

在使用时,可以根据实际需求选择性地配置 ConfirmCallbackReturnCallback。例如,如果需要确保消息成功发送,可以配置 ConfirmCallback;如果需要对无法路由的消息进行处理,可以配置 ReturnCallback

需要注意的是,当同时启用了 ConfirmCallbackReturnCallback,并且消息无法被正确路由时,会优先触发 ReturnCallback,然后再触发 ConfirmCallback

4、ConfirmCallback的使用

在 Spring Boot 项目中使用 ConfirmCallback 来实现消息确认的功能,通常需要配置 RabbitTemplate 并设置相应的 ConfirmCallback。以下是一个简单的示例代码:

  1. 配置 RabbitTemplate 和 ConfirmCallback:
@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        
        // 设置 ConfirmCallback
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("Message confirmed.");
            } else {
                System.out.println("Message not confirmed. Reason: " + cause);
            }
        });

        return rabbitTemplate;
    }
}
  1. 发送消息并处理 ConfirmCallback:
@Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("exchange", "routingKey", message, correlationData);
    }
}

在上述代码中,我们配置了一个 RabbitTemplate Bean,并设置了 ConfirmCallback,当消息被确认接收时,会触发相应的回调函数。在 MessageSender 类中发送消息时,可以通过 CorrelationData 来关联消息的唯一标识符,以便在 ConfirmCallback 中进行处理。

确保 RabbitMQ 已经正确配置,并且 Exchange 和 Queue 已经创建。这样,在消息发送后,ConfirmCallback 就会根据消息的确认状态执行相应的逻辑,以实现消息发送的确认功能。

5、在SpringBoot项目中ReturnCallback如何使用

在 Spring Boot 项目中使用 ReturnCallback 来处理无法路由到队列的消息,同样需要配置 RabbitTemplate 并设置相应的 ReturnCallback。以下是一个简单的示例代码:

  1. 配置 RabbitTemplate 和 ReturnCallback:
@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 设置 ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("Message returned: " + message);
            System.out.println("Reply Code: " + replyCode);
            System.out.println("Reply Text: " + replyText);
            System.out.println("Exchange: " + exchange);
            System.out.println("Routing Key: " + routingKey);
        });

        return rabbitTemplate;
    }
}
  1. 发送消息并处理 ReturnCallback:
@Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.setMandatory(true); // 开启 mandatory 属性,使得无法路由的消息触发 ReturnCallback

        rabbitTemplate.convertAndSend("exchange", "invalidRoutingKey", message);
    }
}

在上述代码中,我们配置了一个带有 ReturnCallbackRabbitTemplate Bean。通过设置 mandatory 属性为 true,确保当消息无法路由到队列时触发 ReturnCallback

MessageSender 类中发送消息时,如果消息无法被正确路由到队列,就会触发 ReturnCallback,我们可以在回调函数中处理这些无法路由的消息,并获取相关的信息进行日志记录或其他操作。

这样,通过配置 ReturnCallback,我们可以处理那些无法被正确路由到队列的消息,确保消息的可靠性和正确性。