diff --git a/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java b/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java index c41c56a..8c9d2d0 100644 --- a/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java +++ b/agileboot-domain/src/main/java/com/agileboot/domain/mqtt/MqttService.java @@ -1,6 +1,9 @@ package com.agileboot.domain.mqtt; import javax.annotation.PostConstruct; + +import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -8,25 +11,46 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Service; import org.eclipse.paho.client.mqttv3.*; +import java.util.ArrayList; +import java.util.List; + @Slf4j @Service @RequiredArgsConstructor public class MqttService implements MqttCallback { + private final List clientConfigs = new ArrayList<>(); + + @AllArgsConstructor + private static class ClientConfig { + private final MqttClient client; + private final MqttConfig config; + } + + @AllArgsConstructor + @Getter + private enum MqttConfig { + LOCK("lock", "lock@ys#6785$", "lock/up/S4202414S", "lock/down/S4202414S"), + SELL("sell", "sell@ys#6785$", "lock/up/S5200184S", "lock/down/S5200184S"); + + private final String username; + private final String password; + private final String topicFilter; + private final String publishTopic; + } + private static final String SERVER_URL = "tcp://221.7.159.46:1883"; -// private static final String USERNAME = "lock"; -// private static final String PASSWORD = "lock@ys#6785$"; -// private static final String TOPIC_FILTER = "lock/up/S4202414S"; -// private static final String TOPIC = "lock/down/S4202414S"; - private static final String USERNAME = "sell"; - private static final String PASSWORD = "sell@ys#6785$"; - private static final String TOPIC_FILTER = "lock/up/S5200184S"; - private static final String TOPIC = "lock/down/S5200184S"; +// private static final String USERNAME = "sell"; +// private static final String PASSWORD = "sell@ys#6785$"; +// private static final String TOPIC_FILTER = "lock/up/S5200184S"; +// private static final String TOPIC = "lock/down/S5200184S"; // private static final String USERNAME = "iot"; // private static final String PASSWORD = "iot@ys#9963$"; // private static final String TOPIC_FILTER = "hongfa/2401310026C50D24FE/upload/"; // private static final String TOPIC = "hongfa/2401310026C50D24FE/download/"; - private MqttClient client; +// private MqttClient client; + @Setter + private List activeTopics = new ArrayList<>(); // 设置自定义消息处理器 @Setter private MessageHandler messageHandler; // 自定义消息处理器接口 @@ -38,7 +62,7 @@ public class MqttService implements MqttCallback { @PostConstruct public void init() throws MqttException { connect(); - subscribe(TOPIC_FILTER); +// subscribe(TOPIC_FILTER); setMessageHandler((String topic, String hexPayload) -> { log.info("收到消息 topic: {}, hexPayload: {}", topic, hexPayload); @@ -46,31 +70,54 @@ public class MqttService implements MqttCallback { } public void connect() throws MqttException { - client = new MqttClient(SERVER_URL, MqttClient.generateClientId(), new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(USERNAME); - options.setPassword(PASSWORD.toCharArray()); - options.setCleanSession(true); - options.setAutomaticReconnect(true); + for (MqttConfig config : MqttConfig.values()) { + MqttClient client = new MqttClient(SERVER_URL, MqttClient.generateClientId(), new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(config.getUsername()); + options.setPassword(config.getPassword().toCharArray()); + options.setCleanSession(true); + options.setAutomaticReconnect(true); + + client.setCallback(new MqttCallback() { + @Override + public void messageArrived(String topic, MqttMessage message) { + MqttService.this.messageArrived(topic, message); + } + @Override public void connectionLost(Throwable cause) {} + @Override public void deliveryComplete(IMqttDeliveryToken token) {} + }); + client.connect(options); + client.subscribe(config.getTopicFilter()); + clientConfigs.add(new ClientConfig(client, config)); + log.info("成功连接MQTT账号:{}", config.getUsername()); + } - client.setCallback(this); - client.connect(options); log.info("连接 MQTT 服务器 {}", SERVER_URL); } - public void subscribe(String topic) throws MqttException { + /*public void subscribe(String topic) throws MqttException { client.subscribe(topic); - } + }*/ public void publish(String data) throws MqttException { // lockCmd((byte) 0x8A, (byte) 0x01, (byte) 0x01, (byte) 0x33, null); String bcc = BCCCalculator.calculateBCC(data); MqttMessage message = new MqttMessage(BCCCalculator.hexStringToByteArray(data + bcc)); - client.publish(TOPIC, message); + clientConfigs.forEach(cc -> { + try { + cc.client.publish(cc.config.getPublishTopic(), message); + } catch (MqttException e) { + log.error("消息发送失败", e); + } + }); } public void disconnect() throws MqttException { - client.disconnect(); + for (ClientConfig cc : clientConfigs) { + MqttClient client = cc.client; + client.disconnect(); + client.close(); + } } @Override @@ -108,7 +155,7 @@ public class MqttService implements MqttCallback { 指令0x9E:读取所有锁状态 指令中未标注cmdSub指令参数的固定为0x33 *******************************************************/ - private boolean lockCmd(byte cmdNo, byte boardNo, byte lockNo, byte cmdSub, String[] rsMsg) + /*private boolean lockCmd(byte cmdNo, byte boardNo, byte lockNo, byte cmdSub, String[] rsMsg) { try { @@ -121,12 +168,18 @@ public class MqttService implements MqttCallback { //发送数据并获取返回值 MqttMessage message = new MqttMessage(sendData); - client.publish(TOPIC, message); + clientConfigs.forEach(cc -> { + try { + cc.client.publish(cc.config.getPublishTopic(), message); + } catch (MqttException e) { + log.error("消息发送失败", e); + } + }); } catch(Exception e) { log.error("lockCmd", e); } return false; - } + }*/ }