ActiveMQ是Apache软件基金会所研发开源的消息中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。
现在的消息队列有不少,RabbitMQ、Kafka、RocketMQ,ZeroMQ等等,而ActiveMQ作为拥有十多年历史的产品,有着许许多多的成功案例,活跃的社区,让它在消息中间件市场占有稳定的一席,据说ActiveMQ的下一代apollo拥有更好的性能。
ActiveMQ由java基于JMS1.1规范的实现,但是支持多种编程语言,如java、C、C++、C#、Python、PHP、Ruby等。
先看看ActiveMQ的架构图:
抛开网络服务(Network Services)不说,我们应该知道的是ActiveMQ的三个重要组成部分:连接(Connectors)、消息模式(Topic,Queue)、消息持久化方式。
ActiveMQ提供了多种应用协议,如OpenWire、StompREST、WSNotification、XMPP、AMQP等,不同的协议有不同的特点,OpenWire用的比较多。
我们要在应用中使用ActiveMQ,根据JSM规范,需要获得一个JMS connection factory,然后再去创建connection,这个时候往往需要指定所使用的的协议。
ActiveMQ提供了两种消息模式:点对点模式(Queue)、发布订阅模式(Topic),这两种模式基本上可以覆盖大部分的需求了。
点对点模式使用队列作为中间媒介,这里的队列就是我们所理解的一个先进先出的一个结构,一个或者多个生产者将消息发送到队列,然后多个消费者从队列安装消息的先后顺序去消费,注意, 队列中的消息只会被一个消费者接受所消费
。
发布订阅模式使用topic作为中间媒介,而topic可以认为就是一个广播站,当一个或者多个生产者将消息发布到topic广播站之后,topic广播站会往当前已经注册订阅的每一个消费者广播消息(这里的消费者我们称为订阅者),注意, 这里每一个订阅者都会收到消息
。
在发布订阅模式中,topic在接收到消息之后,只会给当前已经注册订阅了的订阅者广播消息,那问题来了,如果因为某些原因,如网络问题,导致订阅者断开连接一段时间,而在这段时间内有接收新的消息,那对于那些暂时断开的订阅者,消息是不是就丢了呢?基于这种原因,ActiveMQ将订阅者分为持久化订阅者和非持久化订阅者。非持久化订阅者就是不接受离线时生产的消息,而持久化订阅者则是通过往ActiveMQ中注册一个表明自己身份的ClientId(每个订阅者都有一个ClientId,或自动生成,或自行指定),topic收到消息时,会为处于离线状态的持久化订阅者根据它的ClientId保存消息,当下次相同ClientId的订阅者连接时就可以得到它离线状态下topic收到的消息了。
死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息,简称DLQ。根据apache ActiveMQ官网的说明,当发生以下操作时,消息将会被重新发送给消费者:
1、在一个事务机制的会话中,调用会话的回滚rollback方法
2、在一个事务机制的会话中,未调用提交commit方法就关闭会话
3、会话使用CLIENT_ACKNOWLEDGE客户端签收模式时,调用会话的恢复recover方法
4、客户端连接超时(可能正在执行的代码花费的时间超过配置的超时时间)
当重发达到一定次数后(默认是6次,可配置),broker就会收到一个“Poison ACK”,俗称毒丸,接着broker就会将消息统一发送到一个队列,这个队列就是死信队列。
Advisory是ActiveMQ提供的对生产者、消费者、队列、Topic、消息等等提供的一套消息监控和管理机制,它在一系列的事件发生后,将事件特点以消息的形式发送到特定的Queue或者Topic,监听这些Queue和Topic的消费者就可以收到消息,以此来完成自定义的消息监控及管理。
其实Advisory很常见,默认情况下,当你发送消息到ActiveMQ时,你就会发现ActiveMQ自动创建一些Topic,这些Topic就是Advisory,当然,你也可以配置使用Queue,以下是几个常见的Advisory:
ActiveMQ.Advisory.Connection 当连接建立或者断开时ActiveMQ.Advisory.Producer.Queue 生产者往队列发送消息时ActiveMQ.Advisory.Producer.Topic 生产者往Topic发送消息时ActiveMQ.Advisory.Consumer.Queue 消费者从队列消费消息时ActiveMQ.Advisory.Consumer.Topic 消费者从Topic消费消息时
消息的持久化一般是为了避免消息丢失,即使服务器宕机重启之后,消息能自动恢复,而不是像内存数据一样被清除,当然,ActiveMQ也允许你使用内存来存储消息。
当前ActiveMQ默认的持久化方式是采用Kahadb,而目前主流讨论的持久化方案主要是:AMQ、Kahadb、JDBC、Leveldb、ReplicatedLeveldb。
持久化方式 说明
AMQ ActiveMQ5.3之前版本默认持久化方式,采用日志文件的存储方式,写入和恢复速度都很快
Kahadb Kahadb是一个专门针对消息持久化的解决方案,在ActiveMQ5.4及之后的版本默认采用的存储化方式,性能等各方面比AMQ更优
JDBC JDBC方式是将消息数据写入到数据库中,但是频繁的从数据库读取写入是一件很耗性能的事,于是在此方式的基础上又提出了Journal优化方案,使用高速缓存写入技术,当消费者消费速度跟不上生产者的生产速度时才写入数据库,这样大大提高了性能
Leveldb ActiveMQ5.8之后提出来持久化方式,和Kahadb很相似,但是性能比Kahadb更优,至于为什么不将Leveldb作为默认持久化方式,各种说法都有,还有,好像以后Leveldb以后将不会被支持了
ReplicatedLeveldb ReplicatedLeveldb是ActiveMQ和zookeeper整合时采用的持久化方式,一般在ActiveMQ做集群部署时用到
ActiveMQ与java的JDK是有版本对应匹配的。必须匹配版本,才能正常运行。
根据上面的对照表,咱们可以下载 ActiveMQ 5.15.15 Release 这个版本。
进入管理地址,需要提供用户名和密码:
默认用户和密码都是 admin
package com.bw.controller;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Objects;
@RestController
@RequestMapping("activemq")
public class ActivemqController {
@Autowired
private JmsTemplate jmsTemplate;
/***
* @description 消息发送
* @params
* @return java.lang.String
* @author 军哥
* @date 2023/10/18 20:38
*/
@GetMapping(value = "/sendMessage")
public String sendMessage() {
jmsTemplate.convertAndSend(new ActiveMQQueue("mytest"), "hello,world");
return "OK";
}
/***
* @description 消息接收
* @params
* @return java.lang.String
* @author 军哥
* @date 2023/10/18 20:43
*/
public String recvMessage() {
Object myTest = jmsTemplate.receiveAndConvert("mytest");
if (!Objects.isNull(myTest)) {
System.out.println(myTest);
}
return myTest.toString();
}
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/8/5
* @description AMQ 监听消息
*/
@Component
public class MyMQListener {
@Autowired
private JmsTemplate jmsTemplate;
@JmsListener(destination = "mytest")
// 可监听多个队列
// @JmsListeners(value = {@JmsListener(destination = "T1"), @JmsListener(destination = "T2")})
public void listener(String msg) {
if (!Objects.isNull(msg)) {
System.out.println(msg);
}
}
}
1)、添加依赖
<!--activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.2</version>
</dependency>
2)、创建监听类
package com.qsboss.demo.mq;
import com.alibaba.fastjson2.JSON;
import com.qsboss.demo.pojo.TbPrompt;
import com.qsboss.demo.service.TbPromptService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.concurrent.*;
/**
* @author 军哥
* @version 1.0
* @description: ActiveMQ 消息接收
* @date 2023/10/28 22:25
*/
public class MyMessageListener implements MessageListener{
@Autowired
JavaMailSender javaMailSender;
@Autowired
TbPromptService tbPromptService;
private ThreadPoolExecutor threadPoolExecutor;
public MyMessageListener() {
//创建线程池
BlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>();
threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.MINUTES, blockingDeque);
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
}
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String content = textMessage.getText();
TbPrompt tbPrompt = JSON.parseObject(content, TbPrompt.class);
System.out.println("发送催款邮件: =========================");
System.out.println("发送催款邮件: " + content);
System.out.println("发送催款邮件: =========================");
// 存储到数据库
tbPromptService.addPrompt(tbPrompt);
// 使用线程池发送邮件
Email email = new Email(javaMailSender, tbPrompt);
Future<Boolean> res = threadPoolExecutor.submit(email);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Email implements Callable<Boolean> {
private TbPrompt tbPrompt;
private JavaMailSender javaMailSender;
public Email(JavaMailSender sender, TbPrompt tbPrompt) {
this.tbPrompt = tbPrompt;
this.javaMailSender = sender;
}
@Override
public Boolean call() throws Exception {
Boolean res = send();//调用发送邮件的方法
return res;
}
private Boolean send() {
System.out.println(tbPrompt.getBillName());
//-- 邮件发送密码
try {
//-- 创建邮件对象
MimeMessage message = javaMailSender.createMimeMessage();
//-- 创建helper
MimeMessageHelper helper = new MimeMessageHelper(message);
//-- 设置邮件的内容
helper.setFrom("3350996729@qq.com");
helper.setTo(tbPrompt.getCustomerMail());
String title = "尊敬的" + tbPrompt.getCustomerName() + "用户,您的账单请查收";
helper.setSubject(title);
String body = "费用说明:" + tbPrompt.getBillName() + "<br>您的费用是:" + tbPrompt.getPayAmount() + "元。请尽快支付账单。<br>";
helper.setText(body);
//-- 发送邮件
javaMailSender.send(message);
} catch (MessagingException e) {
e.printStackTrace();
System.out.println("邮件发送失败,稍后再次发送");
// 存储发送失败邮件信息
// TODO
} finally {
}
System.out.println("催收邮件发送成功~~~~~~~~~~~~~~~~~~~~~");
return true;
}
}
3)、定义xml文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<context:property-placeholder location="classpath:/conf/spring.properties"/>
<!--声明消息工厂-->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--链接消息对了服务器-->
<property name="brokerURL" value="tcp://${activemq.host}:${activemq.port}"/>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--声明目的地 队列-->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="${activemq.name}"/>
</bean>
<!-- Spring的JMS模版操作activeMQ工具 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<!-- 消息转化器 -->
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<bean id="messageListener" class="com.qsboss.demo.mq.MyMessageListener" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destinationName" value="bossBill" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>
package com.bw.controller;
import com.alibaba.fastjson.JSON;
import com.bw.pojo.TbBill;
import com.bw.service.TbBillService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.List;
/**
* @author 军哥
* @version 1.0
* @description: TODO
* @date 2023/11/18 8:44
*/
@Controller
@Slf4j
@RequestMapping(value = "/mq")
public class ActiveMqControler {
@Autowired
JmsTemplate jmsTemplate;
@Autowired
TbBillService tbBillService;
/***
* @description 生产者发送消息
* @return java.lang.String
* @author 军哥
* @date 2023/11/18 8:59
*/
@GetMapping(value = "/send")
@ResponseBody
public String send() {
// 发送消息
jmsTemplate.convertAndSend("2108a", "hello,world!!!");
return "OK";
}
/***
* @description 消费者消费消息
* @return java.lang.String
* @author 军哥
* @date 2023/11/18 9:06
*/
@GetMapping(value = "/recv")
@ResponseBody
public String recv() throws JMSException {
// 接收消息
Message receive = jmsTemplate.receive("2108a");
// 转换为文本消息
TextMessage textMessage = (TextMessage)receive;
String text = textMessage.getText();
System.out.println("activemq 接收消息为:" + text);
return text;
}
/***
* @description 生产者发送账单对象
* @return java.lang.String
* @author 军哥
* @date 2023/11/18 9:09
*/
@GetMapping(value = "/sendBill")
@ResponseBody
public String sendBill() {
List<TbBill> list = tbBillService.list();
for (TbBill tbBill : list) {
String s = JSON.toJSONString(tbBill);
// 发送消息
try {
jmsTemplate.convertAndSend("user2108a", s);
} catch (JmsException e) {
e.printStackTrace();
System.out.println("send:"+s);
}
}
return "OK";
}
/***
* @description 接收消息对象
* @return java.lang.String
* @author 军哥
* @date 2023/11/18 16:34
*/
@GetMapping(value = "/recvBill")
@ResponseBody
public String recvBill() throws JMSException {
// 设置超时时间:单位为毫秒
jmsTemplate.setReceiveTimeout(1000);
// 循环接收消息
while (true) {
// 接收消息
Message receive = jmsTemplate.receive("user2108a");
if(receive == null) {
break;
}
// 转换为文本消息
TextMessage textMessage = (TextMessage)receive;
String text = textMessage.getText();
// 反序列化为对象
TbBill tbBill = JSON.parseObject(text, TbBill.class);
System.out.println("recv:"+tbBill);
}
return "OK";
}
}