博主
258
258
258
258
专辑

第八节 RabbitMQ消息发送可靠性的解决方案

亮子 2021-08-04 04:12:10 6575 0 0 0

众所周知,RabbitMQ在保证消息可靠投递的实现过程中有个参数mandatory。该参数的作用是,当消息的mandatory设置为true时,消息投递到Exchange之后,如果Exchange无法将该消息路由到任何一个队列,那么该消息将返回给生产者。当设置为false,RabbitMQ将直接丢弃该消息。

  • 演示场景

定义一个Exchange,不绑定任何Queue,向该Exchange投递mandatory为true消息,那么肯定是路由失败并且需要返回生产者。

1、添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、配置文件

spring.application.name=server-shop-sender
server.port=8600

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.acknowledge-mode=manual

# 当exchange无法找到任何一个合适的queue时,将消息return给生产者
spring.rabbitmq.template.mandatory=true
# 必须设置为true,否则消息消息路由失败也无法触发Return回调
spring.rabbitmq.publisher-returns=true

3、发送回调

package com.shenmazong.servershopsender.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;

/**
 * @author 军哥
 * @version 1.0
 * @description: 消息发送失败的回调函数
 * @date 2021/8/4 15:17
 */

@Slf4j
@Component
public class RabbitTemplateEnhance implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof RabbitTemplate) {
            log.info("增强 RabbitTemplate");
            RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
            // return回调函数
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    log.error("消息被退回:{}", returnedMessage);
                }
            });
            return rabbitTemplate;
        }
        return bean;
    }
}

4、配置类

package com.shenmazong.servershopsender.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 军哥
 * @version 1.0
 * @description: RabbitMQ Configuration
 * @date 2021/8/4 15:30
 */

@Configuration
public class RabbitConfig {

    public static final String EXCHANGE_NAME = "ack_exchage";
    public static final String QUEUE_NAME = "ack_exchage";

    /**
     * @description 创建队列,并设置持久性
     * @author 军哥
     * @date 2021/8/4 15:43
     * @version 1.0
     */
    @Bean
    public Queue ackQueue() {
        return new Queue(QUEUE_NAME, true);
    }

    /**
     * @description 创建交换机,并设置持久性
     * @author 军哥
     * @date 2021/8/4 15:45
     * @version 1.0
     */

    @Bean
    public DirectExchange ackExchange() {
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

//    @Bean
//    public Binding ackBinding() {
//        return BindingBuilder.bind(ackQueue()).to(ackExchange()).with("");
//    }


}

5、消息发送

package com.shenmazong.servershopsender.controller;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.shenmazong.servershopsender.pojo.TbUserInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;

import static com.shenmazong.servershopsender.config.RabbitConfig.EXCHANGE_NAME;

/**
 * @author 军哥
 * @version 1.0
 * @description: TODO
 * @date 2021/8/4 15:18
 */

@RestController
@Slf4j
public class IndexController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostMapping(value = "/send")
    public Object send(@RequestParam("msg") String msg) throws JsonProcessingException {
        log.info("准备发送发送消息");
        TbUserInfo tbUserInfo = new TbUserInfo();
        tbUserInfo.setUserId(666);
        tbUserInfo.setUserName("赵云");
        tbUserInfo.setUserAge(18);
        tbUserInfo.setNickName("赵子龙");

        //--1 转换对象为JSON
        ObjectMapper mapper = new ObjectMapper();
        String user = mapper.writeValueAsString(tbUserInfo);

        Message message = MessageBuilder
                .withBody(user.getBytes(StandardCharsets.UTF_8))
                .setContentEncoding(StandardCharsets.UTF_8.displayName())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .build();

        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);

        return "OK";
    }
}

6、消息接收

package com.shenmazong.demorabbitsend.listen;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

import static com.shenmazong.demorabbitsend.config.RabbitConfig.WORK_QUEUE;

/**
 * @author 军哥
 * @version 1.0
 * @description: 从队列接收消息
 * @date 2021/8/6 11:50
 */

@Component
@Slf4j
public class WorkListener {

    @RabbitListener(queues = EXCHANGE_NAME, ackMode = "MANUAL")
    public void onMessage(Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.info("onMessage=" + msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

参考文档