博主
258
258
258
258
专辑

第十节 RabbitMQ重复消费如何解决

亮子 2021-08-07 19:44:19 7927 0 0 0

1、MQ消息为什么重复

网络不可达造成消息重复。只要通过网络交换数据,就无法避免这个问题

图片alt

AMQP 消费者确认机制
AMQP 定义了消费者确认机制(message ack),如果一个消费者应用B崩溃掉(此时连接会断掉),但是 broker 尚未获得 ACK(6.ACK确认),那么消息会被重新放入队列。所以 AMQP 提供的是“至少一次交付”(at-least-once delivery),异常情况下,消息会被重复消费,此时业务要实现幂等性(重复消息处理)。

2、重复消息如何处理?

1.消费端处理消息的业务逻辑保持幂等性。
2.保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。
第 1 条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第 2 条原理就是利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。
第 1 条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第 2 条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RabbitMQ 不解决消息重复的问题的原因。

  • 演示代码

if(!redisTemplate.hasKey(messageId)){
    ...
     exeBizCode();
     ...
    redisTemplate.opsForValue().setIfAbsent(messageId, messageId, expireTime,
        TimeUnit.SECONDS);
}

3、具体流程

让每个消息携带一个全局的唯一ID,即可保证消息的幂等性,具体消费过程为:

  • 消费者获取到消息后先根据id去查询redis/db是否存在该消息
  • 如果不存在,则正常消费,消费完毕后写入redis/db
  • 如果存在,则证明消息被消费过,直接丢弃。

1)、设置手动接收

server.port=9000
spring.application.name=demo-rabbit-send
# rabbit mq 连接设置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
## 设置接收手动确认模式
spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.direct.default-requeue-rejected=false

2)、生产者

@PostMapping("/send")
public void sendMessage(){

    JSONObject jsonObject = new JSONObject();
    jsonObject.put("message","Java旅途");
    String json = jsonObject.toJSONString();
    Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
    amqpTemplate.convertAndSend("javatrip",message);
}

3)、消费者

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receiveMessage(Message message) throws Exception {

        Jedis jedis = new Jedis("localhost", 6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("接收到的消息为:"+msg+"==消息id为:"+messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("message");
        jedis.set("messageId",messageId);
    }
}