From a1416fa7d700c023f06c54aa3ca33fde1c7ab5b7 Mon Sep 17 00:00:00 2001
From: dzq <dzq@ys.com>
Date: Fri, 9 May 2025 08:34:04 +0800
Subject: [PATCH] =?UTF-8?q?refactor(mqtt):=20=E9=87=8D=E6=9E=84MQTT?=
 =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E4=BB=A5=E6=94=AF=E6=8C=81=E5=A4=9A=E6=9C=8D?=
 =?UTF-8?q?=E5=8A=A1=E5=99=A8=E9=85=8D=E7=BD=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

重构MQTT服务,使其能够根据不同的MQTT服务器ID进行消息发布。移除了硬编码的MQTT配置,改为从数据库动态加载MQTT服务器信息。同时,更新了相关控制器和服务层代码,以支持传递cabinetId参数。
---
 .../api/controller/CabinetCellController.java | 12 +++--
 .../api/controller/MqttController.java        |  4 +-
 .../agileboot/domain/mqtt/MqttService.java    | 54 ++++++-------------
 .../shop/order/OrderApplicationService.java   |  2 +-
 4 files changed, 29 insertions(+), 43 deletions(-)

diff --git a/agileboot-api/src/main/java/com/agileboot/api/controller/CabinetCellController.java b/agileboot-api/src/main/java/com/agileboot/api/controller/CabinetCellController.java
index 4ae16fa..dab58c7 100644
--- a/agileboot-api/src/main/java/com/agileboot/api/controller/CabinetCellController.java
+++ b/agileboot-api/src/main/java/com/agileboot/api/controller/CabinetCellController.java
@@ -12,6 +12,8 @@ import com.agileboot.domain.cabinet.operation.model.CabinetCellOperationModel;
 import com.agileboot.domain.cabinet.operation.model.CabinetCellOperationModelFactory;
 import com.agileboot.domain.cabinet.smartCabinet.SmartCabinetApplicationService;
 import com.agileboot.domain.cabinet.smartCabinet.dto.CabinetDetailDTO;
+import com.agileboot.domain.cabinet.smartCabinet.model.SmartCabinetModel;
+import com.agileboot.domain.cabinet.smartCabinet.model.SmartCabinetModelFactory;
 import com.agileboot.domain.mqtt.MqttService;
 import com.agileboot.domain.shop.goods.model.GoodsModel;
 import com.agileboot.domain.shop.goods.model.GoodsModelFactory;
@@ -33,14 +35,15 @@ public class CabinetCellController {
     private final CabinetCellOperationModelFactory cabinetCellOperationModelFactory;
     private final CabinetCellModelFactory cabinetCellModelFactory;
     private final GoodsModelFactory goodsModelFactory;
+    private final SmartCabinetModelFactory smartCabinetModelFactory;
 
     @GetMapping("/detail")
     public ResponseDTO<List<CabinetDetailDTO>> getCabinetDetail() {
         return ResponseDTO.ok(smartCabinetApplicationService.getCabinetDetail());
     }
 
-    @PostMapping("/openCabinet/{lockControlNo}/{pinNo}")
-    public ResponseDTO<?> openCabinet(@PathVariable Integer lockControlNo, @PathVariable Integer pinNo,
+    @PostMapping("/openCabinet/{cabinetId}/{pinNo}")
+    public ResponseDTO<?> openCabinet(@PathVariable Long cabinetId, @PathVariable Integer pinNo,
                                       @RequestBody AddCabinetCellOperationCommand operationCommand) {
         if (null == operationCommand){
             operationCommand = new AddCabinetCellOperationCommand();
@@ -49,13 +52,16 @@ public class CabinetCellController {
         operationCommand.setStatus(1);
 
         CabinetCellOperationModel cellOperationModel = cabinetCellOperationModelFactory.create();
+
+        SmartCabinetModel smartCabinetModel = smartCabinetModelFactory.loadById(cabinetId);
+        Integer lockControlNo = smartCabinetModel.getLockControlNo();
         // 发送指令
         String mqttDate = "8A";
         mqttDate += String.format("%02X", lockControlNo);
         mqttDate += String.format("%02X", pinNo);
         mqttDate += "11";
         try {
-            mqttService.publish(mqttDate);
+            mqttService.publish(mqttDate, smartCabinetModel.getMqttServerId());
         } catch (Exception e) {
             log.error("mqtt publish error", e);
             operationCommand.setStatus(2);
diff --git a/agileboot-api/src/main/java/com/agileboot/api/controller/MqttController.java b/agileboot-api/src/main/java/com/agileboot/api/controller/MqttController.java
index 8eb2b91..0a6eead 100644
--- a/agileboot-api/src/main/java/com/agileboot/api/controller/MqttController.java
+++ b/agileboot-api/src/main/java/com/agileboot/api/controller/MqttController.java
@@ -20,9 +20,9 @@ public class MqttController {
 
 
     @GetMapping("/opencell")
-    public String opencell(@RequestParam String data) {
+    public String opencell(@RequestParam String data, @RequestParam Long cabinetId) {
         try {
-            mqttService.publish(data);
+            mqttService.publish(data, cabinetId);
             return "success";
         } catch (Exception e) {
             log.error("getPermanentCode error", e);
diff --git a/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java b/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java
index 8c9d2d0..953b29f 100644
--- a/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java
+++ b/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java
@@ -1,9 +1,9 @@
 package com.agileboot.domain.mqtt;
 
+import com.agileboot.domain.cabinet.mqtt.db.MqttServerEntity;
+import com.agileboot.domain.cabinet.mqtt.db.MqttServerService;
 import javax.annotation.PostConstruct;
-
 import lombok.AllArgsConstructor;
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -18,35 +18,15 @@ import java.util.List;
 @Service
 @RequiredArgsConstructor
 public class MqttService implements MqttCallback {
+    private final MqttServerService mqttServerService;
     private final List<ClientConfig> clientConfigs = new ArrayList<>();
 
     @AllArgsConstructor
     private static class ClientConfig {
         private final MqttClient client;
-        private final MqttConfig config;
-    }
-    
-    @AllArgsConstructor
-    @Getter
-    private enum MqttConfig {
-        LOCK("lock", "lock@ys#6785$", "lock/up/S4202414S", "lock/down/S4202414S"),
-        SELL("sell", "sell@ys#6785$", "lock/up/S5200184S", "lock/down/S5200184S");
-
-        private final String username;
-        private final String password;
-        private final String topicFilter;
-        private final String publishTopic;
+        private final MqttServerEntity config;
     }
 
-    private static final String SERVER_URL = "tcp://221.7.159.46:1883";
-//    private static final String USERNAME = "sell";
-//    private static final String PASSWORD = "sell@ys#6785$";
-//    private static final String TOPIC_FILTER = "lock/up/S5200184S";
-//    private static final String TOPIC = "lock/down/S5200184S";
-//    private static final String USERNAME = "iot";
-//    private static final String PASSWORD = "iot@ys#9963$";
-//    private static final String TOPIC_FILTER = "hongfa/2401310026C50D24FE/upload/";
-//    private static final String TOPIC = "hongfa/2401310026C50D24FE/download/";
 
 //    private MqttClient client;
     @Setter
@@ -70,8 +50,8 @@ public class MqttService implements MqttCallback {
     }
 
     public void connect() throws MqttException {
-        for (MqttConfig config : MqttConfig.values()) {
-            MqttClient client = new MqttClient(SERVER_URL, MqttClient.generateClientId(), new MemoryPersistence());
+        for (MqttServerEntity config : mqttServerService.selectAll()) {
+            MqttClient client = new MqttClient(config.getServerUrl(), MqttClient.generateClientId(), new MemoryPersistence());
             MqttConnectOptions options = new MqttConnectOptions();
             options.setUserName(config.getUsername());
             options.setPassword(config.getPassword().toCharArray());
@@ -89,27 +69,27 @@ public class MqttService implements MqttCallback {
             client.connect(options);
             client.subscribe(config.getTopicFilter());
             clientConfigs.add(new ClientConfig(client, config));
-            log.info("成功连接MQTT账号:{}", config.getUsername());
+            log.info("成功连接MQTT服务器:{} 账号:{}", config.getServerUrl(), config.getUsername());
         }
-
-        log.info("连接 MQTT 服务器 {}", SERVER_URL);
     }
 
     /*public void subscribe(String topic) throws MqttException {
         client.subscribe(topic);
     }*/
 
-    public void publish(String data) throws MqttException {
+    public void publish(String data, Long mqttServerId) throws MqttException {
 //        lockCmd((byte) 0x8A, (byte) 0x01, (byte) 0x01, (byte) 0x33, null);
         String bcc = BCCCalculator.calculateBCC(data);
         MqttMessage message = new MqttMessage(BCCCalculator.hexStringToByteArray(data + bcc));
-        clientConfigs.forEach(cc -> {
-            try {
-                cc.client.publish(cc.config.getPublishTopic(), message);
-            } catch (MqttException e) {
-                log.error("消息发送失败", e);
-            }
-        });
+        clientConfigs.stream()
+            .filter(cc -> cc.config.getMqttServerId().equals(mqttServerId))
+            .forEach(cc -> {
+                try {
+                    cc.client.publish(cc.config.getPublishTopic(), message);
+                } catch (MqttException e) {
+                    log.error("消息发送失败", e);
+                }
+            });
     }
 
     public void disconnect() throws MqttException {
diff --git a/agileboot-domain/src/main/java/com/agileboot/domain/shop/order/OrderApplicationService.java b/agileboot-domain/src/main/java/com/agileboot/domain/shop/order/OrderApplicationService.java
index d8a4243..d800d5b 100644
--- a/agileboot-domain/src/main/java/com/agileboot/domain/shop/order/OrderApplicationService.java
+++ b/agileboot-domain/src/main/java/com/agileboot/domain/shop/order/OrderApplicationService.java
@@ -124,7 +124,7 @@ public class OrderApplicationService {
         mqttDate += String.format("%02X", cabinetCellEntity.getPinNo());
         mqttDate += "11";
         try {
-            mqttService.publish(mqttDate);
+            mqttService.publish(mqttDate, smartCabinet.getMqttServerId());
         } catch (Exception e) {
             log.error("mqtt publish error", e);
             operationCommand.setStatus(2);