注册用户之间发送私信,私信不能丢失,也不允许重复发送。
用户可以通过搜索,找到感兴趣的用户,然后关注。只要关注了某个用户,就可以发送私信。任何一个用户也可接收到别人发来的私信。
在Spring Boot项目中使用RabbitMQ时,可以采取以下几种方式来保障消息的不丢失:
持久化队列和消息: 在声明队列和发布消息时,通过设置durable
参数为true
,使队列和消息变为持久化。这样即使RabbitMQ服务器重启或者发生故障,数据也能够被保存下来,不会丢失。
持久化交换器: 如果使用了自定义的交换器,同样可以设置交换器为持久化的。这样即使RabbitMQ服务器重启,交换器的配置信息也能够被保留下来,不会丢失。
确认机制(ACK机制): 在消费者接收到消息后,可以手动向RabbitMQ发送ACK(确认)来告知服务器已成功处理消息。如果消费者在处理消息过程中发生异常或者网络问题导致无法正常确认消息,RabbitMQ会将该消息重新分发给其他消费者,确保消息不丢失。
消息持久化投递模式: 在消息发布时,可以设置消息的投递模式为MessageDeliveryMode.PERSISTENT
,以确保消息被持久化。这样即使RabbitMQ服务器重启,消息也能够被保存下来。
备份交换器(Alternate Exchange): 可以设置备份交换器来处理无法路由的消息,以防止消息丢失。当消息无法被路由时,可以将消息发送到备份交换器中进行处理,例如将消息存储到数据库或者日志系统中。
集群模式和镜像队列: 可以使用RabbitMQ的集群模式和镜像队列来实现高可用性和故障容错。通过在多个节点上部署RabbitMQ服务器,并使用镜像队列配置,可以保证消息的复制和分发,避免单点故障导致消息丢失。
综合使用上述方式,能够有效地保障消息不丢失。根据具体的业务需求和系统设计,可以选择适合的方式或者采取多种方式的组合来提高消息的可靠性。
<!--引入rabbitMQ的依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.80.192
publisher-confirm-type: correlated #开启异步消息确认
publisher-returns: true #开启publisher-return的机制
template:
mandatory: true #消息路由失败,调用ReturnCallback
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
acknowledge-mode: manual # 开启手动确认模式
direct: # 路由模式
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
acknowledge-mode: manual # 开启手动确认模式
前言
在工作中使用Rabbitmq传输数据时,可能会因为数据、网络等问题,导致数据发送或者接收失败;
如果对此类问题没有做好处理,就会存在丢失数据的问题,为此,引入了ConfirmCallback与ReturnCallback,来保证系统能够做到更好的数据监听、以及消费失败的数据做好相应的补偿;
ConfirmCallback与ReturnCallback也被称为Rabbitmq的消息确认机制;
有哪些问题
首先,下面为消息从生产者 ——> 消费者的流程图:
不过如果应用到生产环境中会出现两个问题:
生产者发出的消息可能因为种种原因,并没有发送到交换器,而生产者却不知道;
交换器接收到的消息,并没有发送到队列中,而生产者却不知道;
为了解决以上两个问题,系统引入了ConfirmCallback与ReturnCallback:
@Autowired
RabbitTemplate rabbitTemplate;
/***
* @description 设置RabbitMQ消息的发送回调
* @PostConstruct 这个注解的作用:对象创建完成后,第一个执行的函数
* @return void
* @author 军哥
* @date 2024/2/29 8:31
*/
@PostConstruct
public void init() {
// ReturnCallback为路由不到队列时触发,成功则不触发;
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// 只要消息发送失败了,就会调用这个函数
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
// 消息发送失败:消息重发
String msg = new String(message.getBody());
// 路由模式,需要使用3个参数
String correlationId = message.getMessageProperties().getCorrelationId();
CorrelationData correlationData = new CorrelationData(correlationId);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
}
});
// 消息发送到Exchange(交换器)时回调,成功或者失败都会触发;
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId();
log.warn("消息ID:" + id);
if(!ack) {
// 发送失败
log.error("消息发送失败了");
Message returnedMessage = correlationData.getReturnedMessage();
// 把消息保存到数据库或者Redis中,通过定时任务,再继续发送
// 根据ID,从redis中获取消息内容,然后再重新发送
}
else {
// 发送成功
log.info("消息发送成功了");
}
}
});
}
package com.bwie.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 军哥
* @version 1.0
* @description: RabbitMQ 配置类
* @date 2024/2/28 16:00
*/
@Configuration
public class RabbitConfig {
/***
* @description 创建队列
* @return org.springframework.amqp.core.Queue
* @author 军哥
* @date 2024/2/28 16:03
*/
@Bean
public Queue msgQueue() {
// 第一个参数表示队列的名字,第二参数为true表示开启持久化(保障消息不丢失)
return new Queue("msgQueue", true);
}
/***
* @description 用户私信队列
* @return org.springframework.amqp.core.Queue
* @author 军哥
* @date 2024/2/29 10:28
*/
@Bean
public Queue userMessageQueue() {
// 第一个参数表示队列的名字,第二参数为true表示开启持久化(保障消息不丢失)
return new Queue("userMessageQueue", true);
}
/***
* @description 创建路由模式的交换机
* @return org.springframework.amqp.core.DirectExchange
* @author 军哥
* @date 2024/3/1 10:41
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
/***
* @description 把登录日志的队列,绑定到路由交换机
* @return org.springframework.amqp.core.Binding
* @author 军哥
* @date 2024/3/1 10:43
*/
@Bean
public Binding msgQueueBinding() {
return BindingBuilder.bind(msgQueue()).to(directExchange()).with("log");
}
/***
* @description 发送私信:私信队列绑定到路由交换机上
* @return org.springframework.amqp.core.Binding
* @author 军哥
* @date 2024/3/1 10:45
*/
@Bean
public Binding userQueueBinding() {
return BindingBuilder.bind(userMessageQueue()).to(directExchange()).with("msg");
}
}
/***
* @description 用户发送私信
* @return com.bwie.utils.R
* @author 军哥
* @date 2024/3/1 11:02
*/
@Override
public R send(MessageVo messageVo) {
// 给消息添加唯一的ID,保障消息不重复消费
String msgId = "MSG-" + UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(msgId);
// 把消息对象转换为字符串
String message = JSON.toJSONString(messageVo);
// 把消息的唯一ID存入Redis中,防止重复消费
stringRedisTemplate.opsForValue().set(msgId, message);
// 记住:发送的消息一定要转换为字符串
rabbitTemplate.convertAndSend("directExchange", "msg", message, correlationData);
return R.SUCCESS();
}
/***
* @description 消息的接收
* @return void
* @author 军哥
* @date 2024/2/29 8:11
*/
@RabbitListener(queues = {"msgQueue"})
public void recvMessage(Message message, Channel channel) {
try {
// 解析消息
String msg = new String(message.getBody());
// 把消息字符串转换为消息对象
MessageVo messageVo = JSON.parseObject(msg, MessageVo.class);
// 获取消息ID,并判断ID是否存在,如果存在则消费,如果不存在,则消息删除,防止重复消费
String msgId = message.getMessageProperties().getCorrelationId();
System.out.println("接收:correlationId=" + msgId);
if(!stringRedisTemplate.hasKey(msgId)) {
// key不存在,说明消息已经被消费了,因此,直接删除消息即可,防止重复消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
// 直接返回
return;
}
// 消费消息:消费日志消息
System.out.println("接收到消息:" + messageVo);
String content = messageVo.getMsgBody();
TbLog tbLog = JSON.parseObject(content, TbLog.class);
tbLog.setCreateTime(new Date());
tbLogMapper.insert(tbLog);
System.out.println("保存日志成功:" + tbLog.getLogId());
// 手动确认(删除消息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
// 删除Redis中消息的唯一ID,防止重复消费
stringRedisTemplate.delete(msgId);
} catch (IOException e) {
System.out.println("消息消费失败了");
e.printStackTrace();
}
}