第十节 SpringBoot项目使用EMQX发送和接收消息

亮子 | 2026-02-25 10:44:19 | 37 | 0 | 0 | 0

1、引入依赖

pom.xml 中添加Paho客户端依赖:

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

2. 配置文件

application.yml 中添加EMQX连接配置:

mqtt:
  host: tcp://localhost:1883           # EMQX连接地址
  client-id: service-backend-1          # 客户端ID(需唯一)
  username: admin                        # EMQX认证用户名
  password: public                        # EMQX认证密码
  topic:
    upstream: /vehicle/+/position        # 订阅的上行Topic(车机上报)
    downstream: /cloud/{vin}/command     # 下发的Topic模板
  qos: 1                                    # 默认服务质量
  timeout: 30                               # 连接超时(秒)
  keepalive: 60                              # 心跳间隔(秒)

3. 配置类与核心客户端

@Configuration
@Slf4j
public class MqttPahoConfig {

    @Value("${mqtt.host}")
    private String host;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.timeout}")
    private int timeout;

    @Value("${mqtt.keepalive}")
    private int keepalive;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(host, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setAutomaticReconnect(true);       // 自动重连(车联网必备)
        options.setCleanSession(true);              // 是否清除会话
        
        client.connect(options);
        log.info("MQTT客户端连接成功,clientId: {}", clientId);
        
        // 设置回调处理器
        client.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                log.info("MQTT连接完成,重连状态:{}", reconnect);
                // 连接成功后订阅Topic(可在此处订阅)
            }

            @Override
            public void connectionLost(Throwable cause) {
                log.error("MQTT连接丢失,原因:", cause);
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // 消息到达处理(交由业务Service处理)
                String payload = new String(message.getPayload());
                log.info("收到消息,topic: {}, payload: {}", topic, payload);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                log.debug("消息发送完成,messageId: {}", token.getMessageId());
            }
        });
        
        return client;
    }
}

4. 消息发送与接收服务

@Service
@Slf4j
public class MqttPahoService {

    @Autowired
    private MqttClient mqttClient;

    /**
     * 发送消息到指定Topic
     */
    public void publish(String topic, String payload, int qos) throws MqttException {
        MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
        message.setQos(qos);
        mqttClient.publish(topic, message);
        log.debug("消息发布成功,topic: {}, qos: {}", topic, qos);
    }

    /**
     * 订阅Topic(可在连接成功后调用)
     */
    public void subscribe(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
        log.info("订阅Topic成功,topic: {}, qos: {}", topic, qos);
    }

    /**
     * 取消订阅
     */
    public void unsubscribe(String topic) throws MqttException {
        mqttClient.unsubscribe(topic);
    }
}

5、示例代码仓库

https://gitee.com/ywbingchuan/demo-boot.git