import org.springframework.amqp.core.AcknowledgeMode.MANUAL;
消息接收的确认机制主要有三种模式:
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
这也是SpringBoot集成RabbitMQ默认的消息确认情况,如果消费消息时有异常抛出,则会拒绝消息,反之如果没有捕获到异常则确认本次消费成功。
这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
消费者收到消息后,手动调用basicAck/basicNack/basicReject后,RabbitMQ收到这些消息后,才认为本次投递成功。
设置接收消息手动确认,可以使用一下两种方式:
server.port=9000
spring.application.name=demo-rabbit-send
# rabbit mq 连接设置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
## 设置接收手动确认模式
spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.direct.default-requeue-rejected=false
package com.shenmazong.demorabbitsend.listen;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import static com.shenmazong.demorabbitsend.config.RabbitConfig.WORK_QUEUE;
/**
* @author 军哥
* @version 1.0
* @description: 从队列接收消息
* @date 2021/8/6 11:50
*/
@Component
@Slf4j
public class WorkListener {
@RabbitListener(queues = WORK_QUEUE, ackMode = "MANUAL")
public void onMessage(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
String msg = new String(body);
log.info("onMessage=" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
关键的注解是 @RabbitListener(queues = WORK_QUEUE, ackMode = "MANUAL")
void basicAck(long deliveryTag, boolean multiple) throws IOException;
channel.basicAck()
函数解释:
package com.shenmazong.demorabbitsend.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 军哥
* @version 1.0
* @description: RabbitMQ 配置类
* @date 2021/8/6 11:44
*/
@Configuration
public class RabbitConfig {
public static final String WORK_QUEUE = "workQueue";
public static final String WORK_EXCHANGE = "worExchange";
@Bean
public Queue workQueue() {
return new Queue(WORK_QUEUE);
}
@Bean
public DirectExchange workExchange() {
return new DirectExchange(WORK_EXCHANGE);
}
@Bean
public Binding bindWorkExchange() {
return BindingBuilder.bind(workQueue()).to(workExchange()).with("red");
}
}
接收消息的手动确认,主要是通过一下一行代码来实现:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
如果没有这一行,则消息会一直在队列中。
package com.shenmazong.demorabbitsend.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import static com.shenmazong.demorabbitsend.config.RabbitConfig.WORK_EXCHANGE;
/**
* @author 军哥
* @version 1.0
* @description: 控制层接口类
* @date 2021/8/6 11:54
*/
@RestController
@Slf4j
public class IndexController {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* @description 发送消息到交换机
* @author 军哥
* @date 2021/8/7 17:34
* @version 1.0
*/
@PostMapping(value = "/send")
public Object send(@RequestParam("msg") String msg) {
log.info("send:msg="+msg);
rabbitTemplate.convertAndSend(WORK_EXCHANGE, "red", msg);
return msg;
}
}