第六节 SpringBoot整合RabbitMQ之Topic Exchange

亮子 2021-07-07 12:57:18 18344 0 0 0

1、简介

图片alt

Topic Exchange交换机也叫通配符交换机,我们在发送消息到Topic Exchange的时候不能随意指定route key(应该是由一系列点号连接的字符串,一般会与binding key有关联,route key的长度一般不能超过255个字节)。同理,交换机与队列之间的binding key也应该是点号连接成的字符串,当消息发送者发送信息到Topic Exchage交换机的时候,这时候发送消息的route key会与binding key进行通配符匹配,所有匹配成功的消息都会发送到消息接受者。

Topic Exchange主要有两种通配符:# 和 *

  • *(星号):可以(只能)匹配一个单词
  • #(井号):可以匹配多个单词(或者零个)

下面我们根据一张图来理解一下Topic Exchange是怎么匹配的:

图片alt

【a】一条以“com.register.mail”为routing key的消息将会匹配到Register Queue与SaveMail Queue两个队列上,所以消息会发送到消息接收者1和消息接收者2。routing key为“email.register.test”的消息同样会被推送到Register Queue与SaveMail Queue两个队列。

【b】如果routing key为“com.register.wsh”的话,消息只会被推送到Register Queue上;routing key为“email.com.wsh”的消息会被推送到SaveMail Queue上,routing key为“email.com.test”的消息也会被推送到SaveMail Queue上,但同一条消息只会被推送到SaveMail Queue上一次。

注意:如果在发送消息的时候没有匹配到符合条件的binding key,那么这条消息将会被废弃。如:com.register.wsh.test 消息不会被推送到Register Queue上,但是注意 email.com.wsh.test则可以推送到SaveMail Queue上。

  • 当一个队列被绑定为binding key为“#”时,它将会接收所有的消息,这类似于广播形式的交换机模式。
  • 当binding key不包含"*“和”#"时,这类似于我们上一章说的Direct Exchange直连交换机模式。

2、集成开发

1)、配置类

package com.shenmazong.demomq0707.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicMqConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topicEx");
    }

    @Bean
    public Queue queueRed() {
        return new Queue("queueRed");
    }

    @Bean
    public Queue queueGreed() {
        return new Queue("queueGreed");
    }

    @Bean
    public Binding bindingQueueRed() {
        return BindingBuilder.bind(queueRed()).to(topicExchange()).with("red.*");
    }

    @Bean
    public Binding bindingQueueGreen() {
        return BindingBuilder.bind(queueGreed()).to(topicExchange()).with("green.*");
    }
}

2)、消费者

package com.shenmazong.demomq0707.listenner;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TopicReceiver {

    @RabbitListener(queues = "queueRed")
    public void onRedMessage(String message) {
        log.info("queueRed:"+message);
    }

    @RabbitListener(queues = "queueGreed")
    public void onGreenMessage(String message) {
        log.info("queueGreen:"+message);
    }
}

3)、生产者

    @GetMapping(value = "/topicMessage")
    public Object topicMessage() {
        String msg = count + ":topicMessage";
        log.info("msg="+msg);
        if(count%2 == 0) {
            amqpTemplate.convertAndSend("topicEx", "red.aaa", msg);
        }
        else {
            amqpTemplate.convertAndSend("topicEx", "green.bbb", msg);
        }
        count ++;
        return "ok";
    }

4)、运行效果

图片alt