refactor(mqtt): 重构MQTT服务以支持多客户端配置
重构MqttService类,引入ClientConfig和MqttConfig枚举类,支持多客户端连接和配置管理。移除硬编码的MQTT配置,改为通过枚举动态管理多个客户端的连接和订阅。提升代码的可扩展性和可维护性。
This commit is contained in:
parent
ca01d17e9a
commit
455cdb2f1a
|
@ -1,6 +1,9 @@
|
|||
package com.agileboot.domain.mqtt;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -8,25 +11,46 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|||
import org.springframework.stereotype.Service;
|
||||
import org.eclipse.paho.client.mqttv3.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class MqttService implements MqttCallback {
|
||||
private final List<ClientConfig> clientConfigs = new ArrayList<>();
|
||||
|
||||
@AllArgsConstructor
|
||||
private static class ClientConfig {
|
||||
private final MqttClient client;
|
||||
private final MqttConfig config;
|
||||
}
|
||||
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
private enum MqttConfig {
|
||||
LOCK("lock", "lock@ys#6785$", "lock/up/S4202414S", "lock/down/S4202414S"),
|
||||
SELL("sell", "sell@ys#6785$", "lock/up/S5200184S", "lock/down/S5200184S");
|
||||
|
||||
private final String username;
|
||||
private final String password;
|
||||
private final String topicFilter;
|
||||
private final String publishTopic;
|
||||
}
|
||||
|
||||
private static final String SERVER_URL = "tcp://221.7.159.46:1883";
|
||||
// private static final String USERNAME = "lock";
|
||||
// private static final String PASSWORD = "lock@ys#6785$";
|
||||
// private static final String TOPIC_FILTER = "lock/up/S4202414S";
|
||||
// private static final String TOPIC = "lock/down/S4202414S";
|
||||
private static final String USERNAME = "sell";
|
||||
private static final String PASSWORD = "sell@ys#6785$";
|
||||
private static final String TOPIC_FILTER = "lock/up/S5200184S";
|
||||
private static final String TOPIC = "lock/down/S5200184S";
|
||||
// private static final String USERNAME = "sell";
|
||||
// private static final String PASSWORD = "sell@ys#6785$";
|
||||
// private static final String TOPIC_FILTER = "lock/up/S5200184S";
|
||||
// private static final String TOPIC = "lock/down/S5200184S";
|
||||
// private static final String USERNAME = "iot";
|
||||
// private static final String PASSWORD = "iot@ys#9963$";
|
||||
// private static final String TOPIC_FILTER = "hongfa/2401310026C50D24FE/upload/";
|
||||
// private static final String TOPIC = "hongfa/2401310026C50D24FE/download/";
|
||||
|
||||
private MqttClient client;
|
||||
// private MqttClient client;
|
||||
@Setter
|
||||
private List<String> activeTopics = new ArrayList<>();
|
||||
// 设置自定义消息处理器
|
||||
@Setter
|
||||
private MessageHandler messageHandler; // 自定义消息处理器接口
|
||||
|
@ -38,7 +62,7 @@ public class MqttService implements MqttCallback {
|
|||
@PostConstruct
|
||||
public void init() throws MqttException {
|
||||
connect();
|
||||
subscribe(TOPIC_FILTER);
|
||||
// subscribe(TOPIC_FILTER);
|
||||
|
||||
setMessageHandler((String topic, String hexPayload) -> {
|
||||
log.info("收到消息 topic: {}, hexPayload: {}", topic, hexPayload);
|
||||
|
@ -46,31 +70,54 @@ public class MqttService implements MqttCallback {
|
|||
}
|
||||
|
||||
public void connect() throws MqttException {
|
||||
client = new MqttClient(SERVER_URL, MqttClient.generateClientId(), new MemoryPersistence());
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(USERNAME);
|
||||
options.setPassword(PASSWORD.toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setAutomaticReconnect(true);
|
||||
for (MqttConfig config : MqttConfig.values()) {
|
||||
MqttClient client = new MqttClient(SERVER_URL, MqttClient.generateClientId(), new MemoryPersistence());
|
||||
MqttConnectOptions options = new MqttConnectOptions();
|
||||
options.setUserName(config.getUsername());
|
||||
options.setPassword(config.getPassword().toCharArray());
|
||||
options.setCleanSession(true);
|
||||
options.setAutomaticReconnect(true);
|
||||
|
||||
client.setCallback(new MqttCallback() {
|
||||
@Override
|
||||
public void messageArrived(String topic, MqttMessage message) {
|
||||
MqttService.this.messageArrived(topic, message);
|
||||
}
|
||||
@Override public void connectionLost(Throwable cause) {}
|
||||
@Override public void deliveryComplete(IMqttDeliveryToken token) {}
|
||||
});
|
||||
client.connect(options);
|
||||
client.subscribe(config.getTopicFilter());
|
||||
clientConfigs.add(new ClientConfig(client, config));
|
||||
log.info("成功连接MQTT账号:{}", config.getUsername());
|
||||
}
|
||||
|
||||
client.setCallback(this);
|
||||
client.connect(options);
|
||||
log.info("连接 MQTT 服务器 {}", SERVER_URL);
|
||||
}
|
||||
|
||||
public void subscribe(String topic) throws MqttException {
|
||||
/*public void subscribe(String topic) throws MqttException {
|
||||
client.subscribe(topic);
|
||||
}
|
||||
}*/
|
||||
|
||||
public void publish(String data) throws MqttException {
|
||||
// lockCmd((byte) 0x8A, (byte) 0x01, (byte) 0x01, (byte) 0x33, null);
|
||||
String bcc = BCCCalculator.calculateBCC(data);
|
||||
MqttMessage message = new MqttMessage(BCCCalculator.hexStringToByteArray(data + bcc));
|
||||
client.publish(TOPIC, message);
|
||||
clientConfigs.forEach(cc -> {
|
||||
try {
|
||||
cc.client.publish(cc.config.getPublishTopic(), message);
|
||||
} catch (MqttException e) {
|
||||
log.error("消息发送失败", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void disconnect() throws MqttException {
|
||||
client.disconnect();
|
||||
for (ClientConfig cc : clientConfigs) {
|
||||
MqttClient client = cc.client;
|
||||
client.disconnect();
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -108,7 +155,7 @@ public class MqttService implements MqttCallback {
|
|||
指令0x9E:读取所有锁状态
|
||||
指令中未标注cmdSub指令参数的固定为0x33
|
||||
*******************************************************/
|
||||
private boolean lockCmd(byte cmdNo, byte boardNo, byte lockNo, byte cmdSub, String[] rsMsg)
|
||||
/*private boolean lockCmd(byte cmdNo, byte boardNo, byte lockNo, byte cmdSub, String[] rsMsg)
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -121,12 +168,18 @@ public class MqttService implements MqttCallback {
|
|||
|
||||
//发送数据并获取返回值
|
||||
MqttMessage message = new MqttMessage(sendData);
|
||||
client.publish(TOPIC, message);
|
||||
clientConfigs.forEach(cc -> {
|
||||
try {
|
||||
cc.client.publish(cc.config.getPublishTopic(), message);
|
||||
} catch (MqttException e) {
|
||||
log.error("消息发送失败", e);
|
||||
}
|
||||
});
|
||||
|
||||
} catch(Exception e) {
|
||||
log.error("lockCmd", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}*/
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue