Topic Exchange交换机也叫通配符交换机,我们在发送消息到Topic Exchange的时候不能随意指定route key(应该是由一系列点号连接的字符串,一般会与binding key有关联,route key的长度一般不能超过255
个字节)。同理,交换机与队列之间的binding key也应该是点号连接成的字符串,当消息发送者发送信息到Topic Exchage交换机的时候,这时候发送消息的route key会与binding key进行通配符匹配,所有匹配成功的消息都会发送到消息接受者。
Topic Exchange主要有两种通配符:# 和 *
下面我们根据一张图来理解一下Topic Exchange是怎么匹配的:
【a】一条以“com.register.mail”为routing key的消息将会匹配到Register Queue与SaveMail Queue两个队列上,所以消息会发送到消息接收者1和消息接收者2。routing key为“email.register.test”的消息同样会被推送到Register Queue与SaveMail Queue两个队列。
【b】如果routing key为“com.register.wsh”的话,消息只会被推送到Register Queue上;routing key为“email.com.wsh”的消息会被推送到SaveMail Queue上,routing key为“email.com.test”的消息也会被推送到SaveMail Queue上,但同一条消息只会被推送到SaveMail Queue上一次。
注意:如果在发送消息的时候没有匹配到符合条件的binding key,那么这条消息将会被废弃。如:com.register.wsh.test 消息不会被推送到Register Queue上,但是注意 email.com.wsh.test则可以推送到SaveMail Queue上。
package com.shenmazong.demomq0707.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicMqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicEx");
}
@Bean
public Queue queueRed() {
return new Queue("queueRed");
}
@Bean
public Queue queueGreed() {
return new Queue("queueGreed");
}
@Bean
public Binding bindingQueueRed() {
return BindingBuilder.bind(queueRed()).to(topicExchange()).with("red.*");
}
@Bean
public Binding bindingQueueGreen() {
return BindingBuilder.bind(queueGreed()).to(topicExchange()).with("green.*");
}
}
package com.shenmazong.demomq0707.listenner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TopicReceiver {
@RabbitListener(queues = "queueRed")
public void onRedMessage(String message) {
log.info("queueRed:"+message);
}
@RabbitListener(queues = "queueGreed")
public void onGreenMessage(String message) {
log.info("queueGreen:"+message);
}
}
@GetMapping(value = "/topicMessage")
public Object topicMessage() {
String msg = count + ":topicMessage";
log.info("msg="+msg);
if(count%2 == 0) {
amqpTemplate.convertAndSend("topicEx", "red.aaa", msg);
}
else {
amqpTemplate.convertAndSend("topicEx", "green.bbb", msg);
}
count ++;
return "ok";
}