refactor(mqtt): 重构MQTT服务以支持多服务器配置

重构MQTT服务,使其能够根据不同的MQTT服务器ID进行消息发布。移除了硬编码的MQTT配置,改为从数据库动态加载MQTT服务器信息。同时,更新了相关控制器和服务层代码,以支持传递cabinetId参数。
This commit is contained in:
dzq 2025-05-09 08:34:04 +08:00
parent 42df4996e4
commit a1416fa7d7
4 changed files with 29 additions and 43 deletions

View File

@ -12,6 +12,8 @@ import com.agileboot.domain.cabinet.operation.model.CabinetCellOperationModel;
import com.agileboot.domain.cabinet.operation.model.CabinetCellOperationModelFactory; import com.agileboot.domain.cabinet.operation.model.CabinetCellOperationModelFactory;
import com.agileboot.domain.cabinet.smartCabinet.SmartCabinetApplicationService; import com.agileboot.domain.cabinet.smartCabinet.SmartCabinetApplicationService;
import com.agileboot.domain.cabinet.smartCabinet.dto.CabinetDetailDTO; import com.agileboot.domain.cabinet.smartCabinet.dto.CabinetDetailDTO;
import com.agileboot.domain.cabinet.smartCabinet.model.SmartCabinetModel;
import com.agileboot.domain.cabinet.smartCabinet.model.SmartCabinetModelFactory;
import com.agileboot.domain.mqtt.MqttService; import com.agileboot.domain.mqtt.MqttService;
import com.agileboot.domain.shop.goods.model.GoodsModel; import com.agileboot.domain.shop.goods.model.GoodsModel;
import com.agileboot.domain.shop.goods.model.GoodsModelFactory; import com.agileboot.domain.shop.goods.model.GoodsModelFactory;
@ -33,14 +35,15 @@ public class CabinetCellController {
private final CabinetCellOperationModelFactory cabinetCellOperationModelFactory; private final CabinetCellOperationModelFactory cabinetCellOperationModelFactory;
private final CabinetCellModelFactory cabinetCellModelFactory; private final CabinetCellModelFactory cabinetCellModelFactory;
private final GoodsModelFactory goodsModelFactory; private final GoodsModelFactory goodsModelFactory;
private final SmartCabinetModelFactory smartCabinetModelFactory;
@GetMapping("/detail") @GetMapping("/detail")
public ResponseDTO<List<CabinetDetailDTO>> getCabinetDetail() { public ResponseDTO<List<CabinetDetailDTO>> getCabinetDetail() {
return ResponseDTO.ok(smartCabinetApplicationService.getCabinetDetail()); return ResponseDTO.ok(smartCabinetApplicationService.getCabinetDetail());
} }
@PostMapping("/openCabinet/{lockControlNo}/{pinNo}") @PostMapping("/openCabinet/{cabinetId}/{pinNo}")
public ResponseDTO<?> openCabinet(@PathVariable Integer lockControlNo, @PathVariable Integer pinNo, public ResponseDTO<?> openCabinet(@PathVariable Long cabinetId, @PathVariable Integer pinNo,
@RequestBody AddCabinetCellOperationCommand operationCommand) { @RequestBody AddCabinetCellOperationCommand operationCommand) {
if (null == operationCommand){ if (null == operationCommand){
operationCommand = new AddCabinetCellOperationCommand(); operationCommand = new AddCabinetCellOperationCommand();
@ -49,13 +52,16 @@ public class CabinetCellController {
operationCommand.setStatus(1); operationCommand.setStatus(1);
CabinetCellOperationModel cellOperationModel = cabinetCellOperationModelFactory.create(); CabinetCellOperationModel cellOperationModel = cabinetCellOperationModelFactory.create();
SmartCabinetModel smartCabinetModel = smartCabinetModelFactory.loadById(cabinetId);
Integer lockControlNo = smartCabinetModel.getLockControlNo();
// 发送指令 // 发送指令
String mqttDate = "8A"; String mqttDate = "8A";
mqttDate += String.format("%02X", lockControlNo); mqttDate += String.format("%02X", lockControlNo);
mqttDate += String.format("%02X", pinNo); mqttDate += String.format("%02X", pinNo);
mqttDate += "11"; mqttDate += "11";
try { try {
mqttService.publish(mqttDate); mqttService.publish(mqttDate, smartCabinetModel.getMqttServerId());
} catch (Exception e) { } catch (Exception e) {
log.error("mqtt publish error", e); log.error("mqtt publish error", e);
operationCommand.setStatus(2); operationCommand.setStatus(2);

View File

@ -20,9 +20,9 @@ public class MqttController {
@GetMapping("/opencell") @GetMapping("/opencell")
public String opencell(@RequestParam String data) { public String opencell(@RequestParam String data, @RequestParam Long cabinetId) {
try { try {
mqttService.publish(data); mqttService.publish(data, cabinetId);
return "success"; return "success";
} catch (Exception e) { } catch (Exception e) {
log.error("getPermanentCode error", e); log.error("getPermanentCode error", e);

View File

@ -1,9 +1,9 @@
package com.agileboot.domain.mqtt; package com.agileboot.domain.mqtt;
import com.agileboot.domain.cabinet.mqtt.db.MqttServerEntity;
import com.agileboot.domain.cabinet.mqtt.db.MqttServerService;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -18,35 +18,15 @@ import java.util.List;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class MqttService implements MqttCallback { public class MqttService implements MqttCallback {
private final MqttServerService mqttServerService;
private final List<ClientConfig> clientConfigs = new ArrayList<>(); private final List<ClientConfig> clientConfigs = new ArrayList<>();
@AllArgsConstructor @AllArgsConstructor
private static class ClientConfig { private static class ClientConfig {
private final MqttClient client; private final MqttClient client;
private final MqttConfig config; private final MqttServerEntity config;
} }
@AllArgsConstructor
@Getter
private enum MqttConfig {
LOCK("lock", "lock@ys#6785$", "lock/up/S4202414S", "lock/down/S4202414S"),
SELL("sell", "sell@ys#6785$", "lock/up/S5200184S", "lock/down/S5200184S");
private final String username;
private final String password;
private final String topicFilter;
private final String publishTopic;
}
private static final String SERVER_URL = "tcp://221.7.159.46:1883";
// private static final String USERNAME = "sell";
// private static final String PASSWORD = "sell@ys#6785$";
// private static final String TOPIC_FILTER = "lock/up/S5200184S";
// private static final String TOPIC = "lock/down/S5200184S";
// private static final String USERNAME = "iot";
// private static final String PASSWORD = "iot@ys#9963$";
// private static final String TOPIC_FILTER = "hongfa/2401310026C50D24FE/upload/";
// private static final String TOPIC = "hongfa/2401310026C50D24FE/download/";
// private MqttClient client; // private MqttClient client;
@Setter @Setter
@ -70,8 +50,8 @@ public class MqttService implements MqttCallback {
} }
public void connect() throws MqttException { public void connect() throws MqttException {
for (MqttConfig config : MqttConfig.values()) { for (MqttServerEntity config : mqttServerService.selectAll()) {
MqttClient client = new MqttClient(SERVER_URL, MqttClient.generateClientId(), new MemoryPersistence()); MqttClient client = new MqttClient(config.getServerUrl(), MqttClient.generateClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(config.getUsername()); options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray()); options.setPassword(config.getPassword().toCharArray());
@ -89,21 +69,21 @@ public class MqttService implements MqttCallback {
client.connect(options); client.connect(options);
client.subscribe(config.getTopicFilter()); client.subscribe(config.getTopicFilter());
clientConfigs.add(new ClientConfig(client, config)); clientConfigs.add(new ClientConfig(client, config));
log.info("成功连接MQTT账号:{}", config.getUsername()); log.info("成功连接MQTT服务器:{} 账号:{}", config.getServerUrl(), config.getUsername());
} }
log.info("连接 MQTT 服务器 {}", SERVER_URL);
} }
/*public void subscribe(String topic) throws MqttException { /*public void subscribe(String topic) throws MqttException {
client.subscribe(topic); client.subscribe(topic);
}*/ }*/
public void publish(String data) throws MqttException { public void publish(String data, Long mqttServerId) throws MqttException {
// lockCmd((byte) 0x8A, (byte) 0x01, (byte) 0x01, (byte) 0x33, null); // lockCmd((byte) 0x8A, (byte) 0x01, (byte) 0x01, (byte) 0x33, null);
String bcc = BCCCalculator.calculateBCC(data); String bcc = BCCCalculator.calculateBCC(data);
MqttMessage message = new MqttMessage(BCCCalculator.hexStringToByteArray(data + bcc)); MqttMessage message = new MqttMessage(BCCCalculator.hexStringToByteArray(data + bcc));
clientConfigs.forEach(cc -> { clientConfigs.stream()
.filter(cc -> cc.config.getMqttServerId().equals(mqttServerId))
.forEach(cc -> {
try { try {
cc.client.publish(cc.config.getPublishTopic(), message); cc.client.publish(cc.config.getPublishTopic(), message);
} catch (MqttException e) { } catch (MqttException e) {

View File

@ -124,7 +124,7 @@ public class OrderApplicationService {
mqttDate += String.format("%02X", cabinetCellEntity.getPinNo()); mqttDate += String.format("%02X", cabinetCellEntity.getPinNo());
mqttDate += "11"; mqttDate += "11";
try { try {
mqttService.publish(mqttDate); mqttService.publish(mqttDate, smartCabinet.getMqttServerId());
} catch (Exception e) { } catch (Exception e) {
log.error("mqtt publish error", e); log.error("mqtt publish error", e);
operationCommand.setStatus(2); operationCommand.setStatus(2);