This commit is contained in:
Looly
2026-01-03 20:32:03 +08:00
parent 91155f86f0
commit 805d3c9daa
7 changed files with 431 additions and 4 deletions

View File

@@ -60,7 +60,7 @@
<net.version>3.11.1</net.version>
<emoji-java.version>5.1.1</emoji-java.version>
<spring-boot.version>3.5.3</spring-boot.version>
<spring-boot.version>4.0.1</spring-boot.version>
<oshi.version>6.9.0</oshi.version>
<byte-buddy.version>1.17.8</byte-buddy.version>
<commons-compress.version>1.28.0</commons-compress.version>
@@ -68,10 +68,11 @@
<jaxb-runtime.version>4.0.6</jaxb-runtime.version>
<!-- mq client versions -->
<kafka.version>4.1.0</kafka.version>
<kafka.version>4.1.1</kafka.version>
<rabbitmq.version>5.26.0</rabbitmq.version>
<rocketmq.version>5.3.3</rocketmq.version>
<activemq.version>6.1.7</activemq.version>
<rocketmq.version>5.4.0</rocketmq.version>
<activemq.version>6.2.0</activemq.version>
<mica.version>2.5.11</mica.version>
</properties>
<dependencies>
@@ -558,5 +559,11 @@
<version>${activemq.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.dromara.mica-mqtt</groupId>
<artifactId>mica-mqtt-client</artifactId>
<version>${mica.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,107 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.extra.mq.engine.mica;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.client.MqttClient;
import java.io.IOException;
/**
* mica-mqtt协议消息消费者
*
* @author Looly
* @since 7.0.0
*/
public class MicaMqttConsumer implements Consumer {
private final MqttClient mqttClient;
private String topic;
private MqttQoS mqttQoS;
/**
* 构造
*
* @param mqttClient MQTT客户端
*/
public MicaMqttConsumer(final MqttClient mqttClient) {
this.mqttClient = mqttClient;
// 默认使用QOS0
this.mqttQoS = MqttQoS.QOS0;
}
/**
* 设置消费的Topic
*
* @param topic Topic
* @return this
*/
public MicaMqttConsumer setTopic(final String topic) {
this.topic = topic;
return this;
}
/**
* 设置MQTT消息质量
*
* @param mqttQoS MQTT消息质量
* @return this
*/
public MicaMqttConsumer setMqttQoS(final MqttQoS mqttQoS) {
this.mqttQoS = mqttQoS;
return this;
}
@Override
public void subscribe(final MessageHandler messageHandler) {
if (null == this.topic) {
throw new MQException("Topic must be set before subscription");
}
// 订阅topic并设置消息处理器
this.mqttClient.subscribe(this.topic, mqttQoS, (context, topic, message, payload) -> {
messageHandler.handle(new MicaMqttMessage(topic, payload));
});
}
@Override
public void close() throws IOException {
if (null != this.mqttClient) {
try {
if (this.topic != null) {
this.mqttClient.unSubscribe(this.topic);
}
this.mqttClient.disconnect();
} catch (final Exception e) {
throw new IOException("Failed to close MQTT consumer", e);
}
}
}
/**
* MQTT消息包装类
*
* @author Looly
* @since 7.0.0
*/
private record MicaMqttMessage(String topic, byte[] content) implements Message {
}
}

View File

@@ -0,0 +1,135 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.extra.mq.engine.mica;
import cn.hutool.v7.core.lang.Assert;
import cn.hutool.v7.core.map.MapUtil;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQConfig;
import cn.hutool.v7.extra.mq.Producer;
import cn.hutool.v7.extra.mq.engine.MQEngine;
import cn.hutool.v7.setting.props.Props;
import org.dromara.mica.mqtt.codec.MqttVersion;
import org.dromara.mica.mqtt.core.client.MqttClient;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import java.io.Closeable;
import java.io.IOException;
import java.util.Properties;
/**
* MQTT协议消息队列引擎实现
*
* @author Looly
* @since 7.0.0
*/
public class MicaMqttEngine implements MQEngine, Closeable {
private MqttClient mqttClient;
/**
* 默认构造
*/
public MicaMqttEngine() {
// SPI方式加载时检查库是否引入
Assert.notNull(MqttClient.class);
}
@Override
public MicaMqttEngine init(final MQConfig config) {
final MqttClientCreator creator = MqttClient.create()
.version(MqttVersion.MQTT_5)
.ip(config.getBrokerUrl());
// 其他参数配置
final Properties properties = config.getProperties();
if(MapUtil.isNotEmpty(properties)) {
final Props props = new Props(properties);
final Integer port = props.getInt("port");
if (null != port) {
creator.port(port);
}
final String username = props.getStr("username");
if (null != username) {
creator.username(username);
}
final String password = props.getStr("password");
if (null != password) {
creator.password(password);
}
final String clientId = props.getStr("clientId");
if (null != clientId) {
creator.clientId(clientId);
}
final Integer readBufferSize = props.getInt("readBufferSize");
if (null != readBufferSize) {
creator.readBufferSize(readBufferSize);
}
final Integer maxBytesInMessage = props.getInt("maxBytesInMessage");
if (null != maxBytesInMessage) {
creator.maxBytesInMessage(maxBytesInMessage);
}
final Integer keepAliveSecs = props.getInt("keepAliveSecs");
if (null != keepAliveSecs) {
creator.keepAliveSecs(keepAliveSecs);
}
final Integer timeout = props.getInt("timeout");
if (null != timeout) {
creator.timeout(timeout);
}
final Boolean reconnect = props.getBool("reconnect");
if (null != reconnect) {
creator.reconnect(reconnect);
}
final Integer reInterval = props.getInt("reInterval");
if (null != reInterval) {
creator.reInterval(reInterval);
}
}
this.mqttClient = creator.connect();
return this;
}
@Override
public Producer getProducer() {
return new MicaMqttProducer(mqttClient);
}
@Override
public Consumer getConsumer() {
return new MicaMqttConsumer(mqttClient);
}
@Override
public void close() throws IOException {
if (null != mqttClient && mqttClient.isConnected()) {
mqttClient.disconnect();
mqttClient.stop();
}
}
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.extra.mq.engine.mica;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.Producer;
import org.dromara.mica.mqtt.core.client.MqttClient;
import java.io.IOException;
/**
* MQTT协议消息生产者
*
* @author Looly
* @since 6.0.0
*/
public class MicaMqttProducer implements Producer {
private final MqttClient mqttClient;
/**
* 构造
*
* @param mqttClient MQTT客户端
*/
public MicaMqttProducer(final MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
@Override
public void send(final Message message) {
try {
// 使用MQTT协议发送消息QoS级别设置为1至少一次retained设置为false
this.mqttClient.publish(message.topic(), message.content(), false);
} catch (final Exception e) {
throw new MQException(e);
}
}
@Override
public void close() throws IOException {
if (null != this.mqttClient) {
try {
this.mqttClient.disconnect();
} catch (final Exception e) {
throw new IOException("Failed to close MQTT client", e);
}
}
}
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* mica-mqtt引擎https://github.com/dromara/mica-mqtt
*/
package cn.hutool.v7.extra.mq.engine.mica;

View File

@@ -17,3 +17,4 @@
cn.hutool.v7.extra.mq.engine.kafka.KafkaEngine
cn.hutool.v7.extra.mq.engine.rabbitmq.RabbitMQEngine
cn.hutool.v7.extra.mq.engine.rocketmq.RocketMQEngine
cn.hutool.v7.extra.mq.engine.mica.MicaMqttEngine

View File

@@ -0,0 +1,92 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.extra.mq.engine;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQConfig;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.Producer;
import cn.hutool.v7.extra.mq.engine.mica.MicaMqttEngine;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* MicaMqttEngine 测试
*
* @author Looly
* @since 6.0.0
*/
class MicaMqttEngineTest {
@Test
void testMicaMqttEngineCreation() {
// 创建MQTT引擎
final MicaMqttEngine engine = new MicaMqttEngine();
// 初始化配置
final MQConfig config = MQConfig.of("tcp://localhost:1883");
final MQEngine initializedEngine = engine.init(config);
// 验证引擎初始化成功
assertNotNull(initializedEngine);
assertNotNull(initializedEngine.getProducer());
assertNotNull(initializedEngine.getConsumer());
}
@Test
void testMicaMqttProducer() {
final MicaMqttEngine engine = new MicaMqttEngine();
final MQConfig config = MQConfig.of("tcp://localhost:1883");
engine.init(config);
final Producer producer = engine.getProducer();
assertNotNull(producer);
// 这里不实际发送消息因为需要真实的MQTT broker
}
@Test
void testMicaMqttConsumer() {
final MicaMqttEngine engine = new MicaMqttEngine();
final MQConfig config = MQConfig.of("tcp://localhost:1883");
engine.init(config);
final Consumer consumer = engine.getConsumer();
assertNotNull(consumer);
// 这里不实际订阅消息因为需要真实的MQTT broker
}
@Test
void testMessageInterfaceImplementation() {
// 测试消息接口的实现
final Message testMessage = new Message() {
@Override
public String topic() {
return "test/topic";
}
@Override
public byte[] content() {
return "Hello MQTT".getBytes();
}
};
assertNotNull(testMessage);
assertNotNull(testMessage.topic());
assertNotNull(testMessage.content());
}
}