From 805d3c9daa7cf31bac87158a84a58be7d537ef80 Mon Sep 17 00:00:00 2001 From: Looly Date: Sat, 3 Jan 2026 20:32:03 +0800 Subject: [PATCH] add mica --- hutool-extra/pom.xml | 15 +- .../mq/engine/mica/MicaMqttConsumer.java | 107 ++++++++++++++ .../extra/mq/engine/mica/MicaMqttEngine.java | 135 ++++++++++++++++++ .../mq/engine/mica/MicaMqttProducer.java | 65 +++++++++ .../v7/extra/mq/engine/mica/package-info.java | 20 +++ .../cn.hutool.v7.extra.mq.engine.MQEngine | 1 + .../extra/mq/engine/MicaMqttEngineTest.java | 92 ++++++++++++ 7 files changed, 431 insertions(+), 4 deletions(-) create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttEngine.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttProducer.java create mode 100644 hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/package-info.java create mode 100644 hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java diff --git a/hutool-extra/pom.xml b/hutool-extra/pom.xml index 7551fb3e57..2b4da6c8aa 100755 --- a/hutool-extra/pom.xml +++ b/hutool-extra/pom.xml @@ -60,7 +60,7 @@ 3.11.1 5.1.1 - 3.5.3 + 4.0.1 6.9.0 1.17.8 1.28.0 @@ -68,10 +68,11 @@ 4.0.6 - 4.1.0 + 4.1.1 5.26.0 - 5.3.3 - 6.1.7 + 5.4.0 + 6.2.0 + 2.5.11 @@ -558,5 +559,11 @@ ${activemq.version} provided + + org.dromara.mica-mqtt + mica-mqtt-client + ${mica.version} + provided + diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java new file mode 100644 index 0000000000..6a8feb539f --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.extra.mq.engine.mica; + +import cn.hutool.v7.extra.mq.Consumer; +import cn.hutool.v7.extra.mq.MQException; +import cn.hutool.v7.extra.mq.Message; +import cn.hutool.v7.extra.mq.MessageHandler; +import org.dromara.mica.mqtt.codec.MqttQoS; +import org.dromara.mica.mqtt.core.client.MqttClient; + +import java.io.IOException; + +/** + * mica-mqtt协议消息消费者 + * + * @author Looly + * @since 7.0.0 + */ +public class MicaMqttConsumer implements Consumer { + + private final MqttClient mqttClient; + private String topic; + private MqttQoS mqttQoS; + + /** + * 构造 + * + * @param mqttClient MQTT客户端 + */ + public MicaMqttConsumer(final MqttClient mqttClient) { + this.mqttClient = mqttClient; + // 默认使用QOS0 + this.mqttQoS = MqttQoS.QOS0; + } + + /** + * 设置消费的Topic + * + * @param topic Topic + * @return this + */ + public MicaMqttConsumer setTopic(final String topic) { + this.topic = topic; + return this; + } + + /** + * 设置MQTT消息质量 + * + * @param mqttQoS MQTT消息质量 + * @return this + */ + public MicaMqttConsumer setMqttQoS(final MqttQoS mqttQoS) { + this.mqttQoS = mqttQoS; + return this; + } + + @Override + public void subscribe(final MessageHandler messageHandler) { + if (null == this.topic) { + throw new MQException("Topic must be set before subscription"); + } + + // 订阅topic并设置消息处理器 + this.mqttClient.subscribe(this.topic, mqttQoS, (context, topic, message, payload) -> { + messageHandler.handle(new MicaMqttMessage(topic, payload)); + }); + } + + @Override + public void close() throws IOException { + if (null != this.mqttClient) { + try { + if (this.topic != null) { + this.mqttClient.unSubscribe(this.topic); + } + this.mqttClient.disconnect(); + } catch (final Exception e) { + throw new IOException("Failed to close MQTT consumer", e); + } + } + } + + /** + * MQTT消息包装类 + * + * @author Looly + * @since 7.0.0 + */ + private record MicaMqttMessage(String topic, byte[] content) implements Message { + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttEngine.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttEngine.java new file mode 100644 index 0000000000..2eb38bfd13 --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttEngine.java @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.extra.mq.engine.mica; + +import cn.hutool.v7.core.lang.Assert; +import cn.hutool.v7.core.map.MapUtil; +import cn.hutool.v7.extra.mq.Consumer; +import cn.hutool.v7.extra.mq.MQConfig; +import cn.hutool.v7.extra.mq.Producer; +import cn.hutool.v7.extra.mq.engine.MQEngine; +import cn.hutool.v7.setting.props.Props; +import org.dromara.mica.mqtt.codec.MqttVersion; +import org.dromara.mica.mqtt.core.client.MqttClient; +import org.dromara.mica.mqtt.core.client.MqttClientCreator; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; + +/** + * MQTT协议消息队列引擎实现 + * + * @author Looly + * @since 7.0.0 + */ +public class MicaMqttEngine implements MQEngine, Closeable { + + private MqttClient mqttClient; + + /** + * 默认构造 + */ + public MicaMqttEngine() { + // SPI方式加载时检查库是否引入 + Assert.notNull(MqttClient.class); + } + + @Override + public MicaMqttEngine init(final MQConfig config) { + final MqttClientCreator creator = MqttClient.create() + .version(MqttVersion.MQTT_5) + .ip(config.getBrokerUrl()); + + // 其他参数配置 + final Properties properties = config.getProperties(); + if(MapUtil.isNotEmpty(properties)) { + final Props props = new Props(properties); + + final Integer port = props.getInt("port"); + if (null != port) { + creator.port(port); + } + + final String username = props.getStr("username"); + if (null != username) { + creator.username(username); + } + + final String password = props.getStr("password"); + if (null != password) { + creator.password(password); + } + + final String clientId = props.getStr("clientId"); + if (null != clientId) { + creator.clientId(clientId); + } + + final Integer readBufferSize = props.getInt("readBufferSize"); + if (null != readBufferSize) { + creator.readBufferSize(readBufferSize); + } + + final Integer maxBytesInMessage = props.getInt("maxBytesInMessage"); + if (null != maxBytesInMessage) { + creator.maxBytesInMessage(maxBytesInMessage); + } + + final Integer keepAliveSecs = props.getInt("keepAliveSecs"); + if (null != keepAliveSecs) { + creator.keepAliveSecs(keepAliveSecs); + } + + final Integer timeout = props.getInt("timeout"); + if (null != timeout) { + creator.timeout(timeout); + } + + final Boolean reconnect = props.getBool("reconnect"); + if (null != reconnect) { + creator.reconnect(reconnect); + } + + final Integer reInterval = props.getInt("reInterval"); + if (null != reInterval) { + creator.reInterval(reInterval); + } + } + + this.mqttClient = creator.connect(); + return this; + } + + @Override + public Producer getProducer() { + return new MicaMqttProducer(mqttClient); + } + + @Override + public Consumer getConsumer() { + return new MicaMqttConsumer(mqttClient); + } + + @Override + public void close() throws IOException { + if (null != mqttClient && mqttClient.isConnected()) { + mqttClient.disconnect(); + mqttClient.stop(); + } + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttProducer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttProducer.java new file mode 100644 index 0000000000..f3c7795f4a --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttProducer.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.extra.mq.engine.mica; + +import cn.hutool.v7.extra.mq.MQException; +import cn.hutool.v7.extra.mq.Message; +import cn.hutool.v7.extra.mq.Producer; +import org.dromara.mica.mqtt.core.client.MqttClient; + +import java.io.IOException; + +/** + * MQTT协议消息生产者 + * + * @author Looly + * @since 6.0.0 + */ +public class MicaMqttProducer implements Producer { + + private final MqttClient mqttClient; + + /** + * 构造 + * + * @param mqttClient MQTT客户端 + */ + public MicaMqttProducer(final MqttClient mqttClient) { + this.mqttClient = mqttClient; + } + + @Override + public void send(final Message message) { + try { + // 使用MQTT协议发送消息,QoS级别设置为1(至少一次),retained设置为false + this.mqttClient.publish(message.topic(), message.content(), false); + } catch (final Exception e) { + throw new MQException(e); + } + } + + @Override + public void close() throws IOException { + if (null != this.mqttClient) { + try { + this.mqttClient.disconnect(); + } catch (final Exception e) { + throw new IOException("Failed to close MQTT client", e); + } + } + } +} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/package-info.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/package-info.java new file mode 100644 index 0000000000..86f6038408 --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * mica-mqtt引擎,见:https://github.com/dromara/mica-mqtt + */ +package cn.hutool.v7.extra.mq.engine.mica; diff --git a/hutool-extra/src/main/resources/META-INF/services/cn.hutool.v7.extra.mq.engine.MQEngine b/hutool-extra/src/main/resources/META-INF/services/cn.hutool.v7.extra.mq.engine.MQEngine index af4022a506..ccb6ce15e2 100644 --- a/hutool-extra/src/main/resources/META-INF/services/cn.hutool.v7.extra.mq.engine.MQEngine +++ b/hutool-extra/src/main/resources/META-INF/services/cn.hutool.v7.extra.mq.engine.MQEngine @@ -17,3 +17,4 @@ cn.hutool.v7.extra.mq.engine.kafka.KafkaEngine cn.hutool.v7.extra.mq.engine.rabbitmq.RabbitMQEngine cn.hutool.v7.extra.mq.engine.rocketmq.RocketMQEngine +cn.hutool.v7.extra.mq.engine.mica.MicaMqttEngine diff --git a/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java b/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java new file mode 100644 index 0000000000..76c6955c38 --- /dev/null +++ b/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.extra.mq.engine; + +import cn.hutool.v7.extra.mq.Consumer; +import cn.hutool.v7.extra.mq.MQConfig; +import cn.hutool.v7.extra.mq.Message; +import cn.hutool.v7.extra.mq.Producer; +import cn.hutool.v7.extra.mq.engine.mica.MicaMqttEngine; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * MicaMqttEngine 测试 + * + * @author Looly + * @since 6.0.0 + */ +class MicaMqttEngineTest { + + @Test + void testMicaMqttEngineCreation() { + // 创建MQTT引擎 + final MicaMqttEngine engine = new MicaMqttEngine(); + + // 初始化配置 + final MQConfig config = MQConfig.of("tcp://localhost:1883"); + final MQEngine initializedEngine = engine.init(config); + + // 验证引擎初始化成功 + assertNotNull(initializedEngine); + assertNotNull(initializedEngine.getProducer()); + assertNotNull(initializedEngine.getConsumer()); + } + + @Test + void testMicaMqttProducer() { + final MicaMqttEngine engine = new MicaMqttEngine(); + final MQConfig config = MQConfig.of("tcp://localhost:1883"); + engine.init(config); + + final Producer producer = engine.getProducer(); + assertNotNull(producer); + // 这里不实际发送消息,因为需要真实的MQTT broker + } + + @Test + void testMicaMqttConsumer() { + final MicaMqttEngine engine = new MicaMqttEngine(); + final MQConfig config = MQConfig.of("tcp://localhost:1883"); + engine.init(config); + + final Consumer consumer = engine.getConsumer(); + assertNotNull(consumer); + // 这里不实际订阅消息,因为需要真实的MQTT broker + } + + @Test + void testMessageInterfaceImplementation() { + // 测试消息接口的实现 + final Message testMessage = new Message() { + @Override + public String topic() { + return "test/topic"; + } + + @Override + public byte[] content() { + return "Hello MQTT".getBytes(); + } + }; + + assertNotNull(testMessage); + assertNotNull(testMessage.topic()); + assertNotNull(testMessage.content()); + } +}