博主
258
258
258
258
专辑

第十六节 SSM框架使用ActiveMQ消息队列

亮子 2023-10-18 12:03:38 5080 0 0 0

1、ActiveMQ简介

ActiveMQ是Apache软件基金会所研发开源的消息中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。

  现在的消息队列有不少,RabbitMQ、Kafka、RocketMQ,ZeroMQ等等,而ActiveMQ作为拥有十多年历史的产品,有着许许多多的成功案例,活跃的社区,让它在消息中间件市场占有稳定的一席,据说ActiveMQ的下一代apollo拥有更好的性能。

官网地址
https://activemq.apache.org/

图片alt

  ActiveMQ由java基于JMS1.1规范的实现,但是支持多种编程语言,如java、C、C++、C#、Python、PHP、Ruby等。

  先看看ActiveMQ的架构图:

图片alt

抛开网络服务(Network Services)不说,我们应该知道的是ActiveMQ的三个重要组成部分:连接(Connectors)、消息模式(Topic,Queue)、消息持久化方式。

2、连接(Connectors)

  ActiveMQ提供了多种应用协议,如OpenWire、StompREST、WSNotification、XMPP、AMQP等,不同的协议有不同的特点,OpenWire用的比较多。

  我们要在应用中使用ActiveMQ,根据JSM规范,需要获得一个JMS connection factory,然后再去创建connection,这个时候往往需要指定所使用的的协议。

3、消息模式

  ActiveMQ提供了两种消息模式:点对点模式(Queue)、发布订阅模式(Topic),这两种模式基本上可以覆盖大部分的需求了。

1)、点对点模式(Queue)

  点对点模式使用队列作为中间媒介,这里的队列就是我们所理解的一个先进先出的一个结构,一个或者多个生产者将消息发送到队列,然后多个消费者从队列安装消息的先后顺序去消费,注意, 队列中的消息只会被一个消费者接受所消费

图片alt

2)、发布订阅模式(Topic)

  发布订阅模式使用topic作为中间媒介,而topic可以认为就是一个广播站,当一个或者多个生产者将消息发布到topic广播站之后,topic广播站会往当前已经注册订阅的每一个消费者广播消息(这里的消费者我们称为订阅者),注意, 这里每一个订阅者都会收到消息

图片alt

4、持久化订阅

  在发布订阅模式中,topic在接收到消息之后,只会给当前已经注册订阅了的订阅者广播消息,那问题来了,如果因为某些原因,如网络问题,导致订阅者断开连接一段时间,而在这段时间内有接收新的消息,那对于那些暂时断开的订阅者,消息是不是就丢了呢?基于这种原因,ActiveMQ将订阅者分为持久化订阅者和非持久化订阅者。非持久化订阅者就是不接受离线时生产的消息,而持久化订阅者则是通过往ActiveMQ中注册一个表明自己身份的ClientId(每个订阅者都有一个ClientId,或自动生成,或自行指定),topic收到消息时,会为处于离线状态的持久化订阅者根据它的ClientId保存消息,当下次相同ClientId的订阅者连接时就可以得到它离线状态下topic收到的消息了。

5、死信队列

  死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息,简称DLQ。根据apache ActiveMQ官网的说明,当发生以下操作时,消息将会被重新发送给消费者:

  1、在一个事务机制的会话中,调用会话的回滚rollback方法

  2、在一个事务机制的会话中,未调用提交commit方法就关闭会话

  3、会话使用CLIENT_ACKNOWLEDGE客户端签收模式时,调用会话的恢复recover方法

  4、客户端连接超时(可能正在执行的代码花费的时间超过配置的超时时间)

  当重发达到一定次数后(默认是6次,可配置),broker就会收到一个“Poison ACK”,俗称毒丸,接着broker就会将消息统一发送到一个队列,这个队列就是死信队列。

6、Advisory

  Advisory是ActiveMQ提供的对生产者、消费者、队列、Topic、消息等等提供的一套消息监控和管理机制,它在一系列的事件发生后,将事件特点以消息的形式发送到特定的Queue或者Topic,监听这些Queue和Topic的消费者就可以收到消息,以此来完成自定义的消息监控及管理。

  其实Advisory很常见,默认情况下,当你发送消息到ActiveMQ时,你就会发现ActiveMQ自动创建一些Topic,这些Topic就是Advisory,当然,你也可以配置使用Queue,以下是几个常见的Advisory:

ActiveMQ.Advisory.Connection 当连接建立或者断开时ActiveMQ.Advisory.Producer.Queue 生产者往队列发送消息时ActiveMQ.Advisory.Producer.Topic 生产者往Topic发送消息时ActiveMQ.Advisory.Consumer.Queue 消费者从队列消费消息时ActiveMQ.Advisory.Consumer.Topic 消费者从Topic消费消息时

7、消息持久化方式

  消息的持久化一般是为了避免消息丢失,即使服务器宕机重启之后,消息能自动恢复,而不是像内存数据一样被清除,当然,ActiveMQ也允许你使用内存来存储消息。

  当前ActiveMQ默认的持久化方式是采用Kahadb,而目前主流讨论的持久化方案主要是:AMQ、Kahadb、JDBC、Leveldb、ReplicatedLeveldb。

持久化方式 说明

AMQ ActiveMQ5.3之前版本默认持久化方式,采用日志文件的存储方式,写入和恢复速度都很快

Kahadb Kahadb是一个专门针对消息持久化的解决方案,在ActiveMQ5.4及之后的版本默认采用的存储化方式,性能等各方面比AMQ更优

JDBC JDBC方式是将消息数据写入到数据库中,但是频繁的从数据库读取写入是一件很耗性能的事,于是在此方式的基础上又提出了Journal优化方案,使用高速缓存写入技术,当消费者消费速度跟不上生产者的生产速度时才写入数据库,这样大大提高了性能

Leveldb ActiveMQ5.8之后提出来持久化方式,和Kahadb很相似,但是性能比Kahadb更优,至于为什么不将Leveldb作为默认持久化方式,各种说法都有,还有,好像以后Leveldb以后将不会被支持了

ReplicatedLeveldb ReplicatedLeveldb是ActiveMQ和zookeeper整合时采用的持久化方式,一般在ActiveMQ做集群部署时用到

8、下载与运行

ActiveMQ与java的JDK是有版本对应匹配的。必须匹配版本,才能正常运行。

  • 版本匹配图

图片alt

  • 下载地址

官网各个版本下载目录
http://activemq.apache.org/download-archives.html

根据上面的对照表,咱们可以下载 ActiveMQ 5.15.15 Release 这个版本。

图片alt

  • 运行ActiveMQ

图片alt

图片alt

  • 进入ActiveMQ

管理地址:
http://localhost:8161/index.html

进入管理地址,需要提供用户名和密码:

图片alt

默认用户和密码都是 admin

图片alt

  • 显示已经存在的队列

图片alt

8、ActiveMQ消息的发送与接收

  • ActiveMQ消息发送和接收
package com.bw.controller;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Objects;

@RestController
@RequestMapping("activemq")
public class ActivemqController {

    @Autowired
    private JmsTemplate jmsTemplate;

    /***
     * @description 消息发送
     * @params
     * @return java.lang.String
     * @author 军哥
     * @date 2023/10/18 20:38
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        jmsTemplate.convertAndSend(new ActiveMQQueue("mytest"), "hello,world");
        return "OK";
    }

    /***
     * @description 消息接收
     * @params
     * @return java.lang.String
     * @author 军哥
     * @date 2023/10/18 20:43
     */
    public String recvMessage() {
        Object myTest = jmsTemplate.receiveAndConvert("mytest");

        if (!Objects.isNull(myTest)) {
            System.out.println(myTest);
        }

        return myTest.toString();
    }
}
  • 监听消息
/**
 * Created with IntelliJ IDEA.
 *
 * @author Demo_Null
 * @date 2020/8/5
 * @description AMQ 监听消息
 */
@Component
public class MyMQListener {

    @Autowired
    private JmsTemplate jmsTemplate;

    @JmsListener(destination = "mytest")
    // 可监听多个队列
    // @JmsListeners(value = {@JmsListener(destination = "T1"), @JmsListener(destination = "T2")})
    public void listener(String msg) {
        if (!Objects.isNull(msg)) {
            System.out.println(msg);
       }
    }
}

9、SSM框架如何实现消息的监听

1)、添加依赖

        <!--activemq  -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.12.2</version>
        </dependency>

2)、创建监听类

package com.qsboss.demo.mq;

import com.alibaba.fastjson2.JSON;
import com.qsboss.demo.pojo.TbPrompt;
import com.qsboss.demo.service.TbPromptService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.concurrent.*;

/**
 * @author 军哥
 * @version 1.0
 * @description: ActiveMQ 消息接收
 * @date 2023/10/28 22:25
 */

public class MyMessageListener implements MessageListener{

    @Autowired
    JavaMailSender javaMailSender;

    @Autowired
    TbPromptService tbPromptService;

    private ThreadPoolExecutor threadPoolExecutor;

    public MyMessageListener() {
        //创建线程池
        BlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>();
        threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.MINUTES, blockingDeque);
        threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    }

    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String content = textMessage.getText();
                TbPrompt tbPrompt = JSON.parseObject(content, TbPrompt.class);

                System.out.println("发送催款邮件: =========================");
                System.out.println("发送催款邮件: " + content);
                System.out.println("发送催款邮件: =========================");

                // 存储到数据库
                tbPromptService.addPrompt(tbPrompt);

                // 使用线程池发送邮件
                Email email = new Email(javaMailSender, tbPrompt);
                Future<Boolean> res = threadPoolExecutor.submit(email);

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


class Email implements Callable<Boolean> {

    private TbPrompt tbPrompt;
    private JavaMailSender javaMailSender;

    public Email(JavaMailSender sender, TbPrompt tbPrompt) {
        this.tbPrompt = tbPrompt;
        this.javaMailSender = sender;
    }

    @Override
    public Boolean call() throws Exception {
        Boolean res = send();//调用发送邮件的方法
        return res;
    }

    private Boolean send() {

        System.out.println(tbPrompt.getBillName());

        //-- 邮件发送密码
        try {
            //-- 创建邮件对象
            MimeMessage message = javaMailSender.createMimeMessage();

            //-- 创建helper
            MimeMessageHelper helper = new MimeMessageHelper(message);

            //-- 设置邮件的内容
            helper.setFrom("3350996729@qq.com");
            helper.setTo(tbPrompt.getCustomerMail());
            String title = "尊敬的" + tbPrompt.getCustomerName() + "用户,您的账单请查收";
            helper.setSubject(title);

            String body = "费用说明:" + tbPrompt.getBillName() + "<br>您的费用是:" + tbPrompt.getPayAmount() + "元。请尽快支付账单。<br>";

            helper.setText(body);

            //-- 发送邮件
            javaMailSender.send(message);
        } catch (MessagingException e) {
            e.printStackTrace();
            System.out.println("邮件发送失败,稍后再次发送");
            // 存储发送失败邮件信息
            // TODO
        } finally {
        }

        System.out.println("催收邮件发送成功~~~~~~~~~~~~~~~~~~~~~");

        return true;
    }


}

3)、定义xml文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <context:property-placeholder location="classpath:/conf/spring.properties"/>

    <!--声明消息工厂-->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <!--链接消息对了服务器-->
                <property name="brokerURL" value="tcp://${activemq.host}:${activemq.port}"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--声明目的地 队列-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${activemq.name}"/>
    </bean>

    <!-- Spring的JMS模版操作activeMQ工具 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <!-- 消息转化器 -->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <bean id="messageListener" class="com.qsboss.demo.mq.MyMessageListener" />

    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory" />
        <property name="destinationName" value="bossBill" />
        <property name="messageListener" ref="messageListener" />
    </bean>
</beans>

10、发送和接收对象

package com.bw.controller;

import com.alibaba.fastjson.JSON;
import com.bw.pojo.TbBill;
import com.bw.service.TbBillService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.List;

/**
 * @author 军哥
 * @version 1.0
 * @description: TODO
 * @date 2023/11/18 8:44
 */

@Controller
@Slf4j
@RequestMapping(value = "/mq")
public class ActiveMqControler {

    @Autowired
    JmsTemplate jmsTemplate;

    @Autowired
    TbBillService tbBillService;

    /***
     * @description 生产者发送消息
     * @return java.lang.String
     * @author 军哥
     * @date 2023/11/18 8:59
     */
    @GetMapping(value = "/send")
    @ResponseBody
    public String send() {
        // 发送消息
        jmsTemplate.convertAndSend("2108a", "hello,world!!!");
        return "OK";
    }

    /***
     * @description 消费者消费消息
     * @return java.lang.String
     * @author 军哥
     * @date 2023/11/18 9:06
     */
    @GetMapping(value = "/recv")
    @ResponseBody
    public String recv() throws JMSException {
        // 接收消息
        Message receive = jmsTemplate.receive("2108a");

        // 转换为文本消息
        TextMessage textMessage = (TextMessage)receive;

        String text = textMessage.getText();
        System.out.println("activemq 接收消息为:" + text);

        return text;
    }

    /***
     * @description 生产者发送账单对象
     * @return java.lang.String
     * @author 军哥
     * @date 2023/11/18 9:09
     */
    @GetMapping(value = "/sendBill")
    @ResponseBody
    public String sendBill() {

        List<TbBill> list = tbBillService.list();

        for (TbBill tbBill : list) {
            String s = JSON.toJSONString(tbBill);

            // 发送消息
            try {
                jmsTemplate.convertAndSend("user2108a", s);
            } catch (JmsException e) {
                e.printStackTrace();
                System.out.println("send:"+s);
            }
        }

        return "OK";
    }

    /***
     * @description 接收消息对象
     * @return java.lang.String
     * @author 军哥
     * @date 2023/11/18 16:34
     */
    @GetMapping(value = "/recvBill")
    @ResponseBody
    public String recvBill() throws JMSException {

        // 设置超时时间:单位为毫秒
        jmsTemplate.setReceiveTimeout(1000);

        // 循环接收消息
        while (true) {
            // 接收消息
            Message receive = jmsTemplate.receive("user2108a");
            if(receive == null) {
                break;
            }

            // 转换为文本消息
            TextMessage textMessage = (TextMessage)receive;

            String text = textMessage.getText();

            // 反序列化为对象
            TbBill tbBill = JSON.parseObject(text, TbBill.class);
            System.out.println("recv:"+tbBill);
        }

        return "OK";
    }
}

11、ActiveMQ手动确认收消息

图片alt

参考文章