博主
258
258
258
258
专辑

第四节 在SpringBoot中集成RabbitMQ(一)

亮子 2021-07-06 13:20:16 5761 0 0 0

1、创建工程

图片alt

  • 添加依赖

可以选择手动添加依赖如下:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、配置文件

spring.application.name=demo-rabbitmq-0706
server.port=9000

## Rabbit MQ
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 确认消息发送到交换机上
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.virtual-host=/
# 确认消息已发送到队列
spring.rabbitmq.publisher-returns=true

3、简单模式(sample)

1)、配置类

package com.shenmazong.demorabbitmq0706.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }
}

2)、消费者(接收端)

package com.shenmazong.demorabbitmq0706.listenner;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void onMessage(String message) {
        System.out.println(message);
    }
}

3)、生产者(发送端)

package com.shenmazong.demorabbitmq0706.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {

    @Autowired
    AmqpTemplate amqpTemplate;

    @GetMapping(value = "/sendMessage")
    public Object sendMessage() {
        amqpTemplate.convertAndSend("hello", "hello,world");
        return "ok";
    }
}

4、工作模式(work)

1)、配置类

    @Bean
    public Queue workQueue() {
        return new Queue("work");
    }

2)两个消费者

  • 第一个消费者
package com.shenmazong.demorabbitmq0706.listenner;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "work")
public class WorkReceiver1 {

    @RabbitHandler
    public void onMessage(String message) {
        System.out.println("work1:"+message);
    }
}
  • 第二个消费者
package com.shenmazong.demorabbitmq0706.listenner;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "work")
public class WorkReceiver2 {

    @RabbitHandler
    public void onMessage(String message) {
        System.out.println("work2:"+message);
    }
}

3)、一个生产者

package com.shenmazong.demorabbitmq0706.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {

    private Integer idx = 0;

    @Autowired
    AmqpTemplate amqpTemplate;

    @GetMapping(value = "/workMessage")
    public Object workMessage() {
        amqpTemplate.convertAndSend("work", idx++ + ",work message");
        return idx;
    }
}

4)、运行效果

图片alt

5、订阅模式(fanout)

1)、配置类

package com.shenmazong.demorabbitmq0706.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutMqConfig {
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

2)、消费者

  • 消费者1
package com.shenmazong.demorabbitmq0706.listenner;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
@Slf4j
public class FanoutReceiver1 {

    @RabbitHandler
    public void onMessage(String message) {
        log.info("FanoutReceiver1:"+message);
    }
}
  • 消费者2
package com.shenmazong.demorabbitmq0706.listenner;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
@Slf4j
public class FanoutReceiver2 {

    @RabbitHandler
    public void onMessage(String message) {
        log.info("FanoutReceiver2:"+message);
    }
}
  • 消费者3
package com.shenmazong.demorabbitmq0706.listenner;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
@Slf4j
public class FanoutReceiver3 {

    @RabbitHandler
    public void onMessage(String message) {
        log.info("FanoutReceiver3:"+message);
    }
}

3)、生产者

    @GetMapping(value = "/fanoutMessage")
    public Object fanoutMessage() {
        amqpTemplate.convertAndSend("fanoutExchange", "",idx++ + ",fanout message");
        return idx;
    }

4)、运行效果

图片alt

参考网址: