This commit is contained in:
Looly
2026-01-03 21:01:38 +08:00
parent 805d3c9daa
commit 30025b86eb
7 changed files with 95 additions and 175 deletions

View File

@@ -0,0 +1,25 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.extra.mq;
/**
* 简单的消息封装
*
* @param topic 主题
* @param content 内容
*/
public record SimpleMessage(String topic, byte[] content) implements Message {}

View File

@@ -22,10 +22,7 @@ import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import jakarta.jms.BytesMessage;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.TextMessage;
import jakarta.jms.*;
import java.io.IOException;
@@ -65,28 +62,7 @@ public class JmsConsumer implements Consumer {
@Override
public void subscribe(final MessageHandler messageHandler) {
try {
this.consumer.setMessageListener(message -> messageHandler.handle(new Message() {
@Override
public String topic() {
return consumerGroup;
}
@Override
public byte[] content() {
try {
if (message instanceof TextMessage) {
// TODO 考虑编码
return ByteUtil.toUtf8Bytes(((TextMessage) message).getText());
} else if (message instanceof BytesMessage) {
return new byte[(int) ((BytesMessage) message).getBodyLength()];
} else {
throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName());
}
} catch (final JMSException e) {
throw new MQException(e);
}
}
}));
this.consumer.setMessageListener(message -> messageHandler.handle(new JmsMessage(consumerGroup, message)));
} catch (final JMSException e) {
throw new MQException(e);
}
@@ -96,4 +72,28 @@ public class JmsConsumer implements Consumer {
public void close() throws IOException {
IoUtil.closeQuietly(this.consumer);
}
/**
* JMS消息封装
*
* @param topic JMS主题即consumerGroup
* @param jmsMessage
*/
private record JmsMessage(String topic, jakarta.jms.Message jmsMessage) implements Message {
@Override
public byte[] content() {
try {
if (jmsMessage instanceof TextMessage) {
// TODO 考虑编码
return ByteUtil.toUtf8Bytes(((TextMessage) jmsMessage).getText());
} else if (jmsMessage instanceof BytesMessage) {
return new byte[(int) ((BytesMessage) jmsMessage).getBodyLength()];
} else {
throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getClass().getName());
}
} catch (final JMSException e) {
throw new MQException(e);
}
}
}
}

View File

@@ -16,12 +16,12 @@
package cn.hutool.v7.extra.mq.engine.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import cn.hutool.v7.core.collection.ListUtil;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import cn.hutool.v7.extra.mq.SimpleMessage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.time.Duration;
@@ -73,7 +73,7 @@ public class KafkaConsumer implements Consumer {
* @param topicPattern topic{@link Pattern}
* @return this
*/
public KafkaConsumer setTopicPattern(final Pattern topicPattern){
public KafkaConsumer setTopicPattern(final Pattern topicPattern) {
this.consumer.subscribe(topicPattern);
return this;
}
@@ -81,7 +81,7 @@ public class KafkaConsumer implements Consumer {
@Override
public void subscribe(final MessageHandler messageHandler) {
for (final ConsumerRecord<String, byte[]> record : this.consumer.poll(Duration.ofMillis(3000))) {
messageHandler.handle(new ConsumerRecordMessage(record));
messageHandler.handle(new SimpleMessage(record.topic(), record.value()));
}
}
@@ -89,33 +89,4 @@ public class KafkaConsumer implements Consumer {
public void close() throws IOException {
IoUtil.nullSafeClose(this.consumer);
}
/**
* 消费者记录包装为消息
*
* @author looly
*/
private static class ConsumerRecordMessage implements Message {
private final ConsumerRecord<String, byte[]> record;
/**
* 构造
*
* @param record {@link ConsumerRecord}
*/
private ConsumerRecordMessage(final ConsumerRecord<String, byte[]> record) {
this.record = record;
}
@Override
public String topic() {
return record.topic();
}
@Override
public byte[] content() {
return record.value();
}
}
}

View File

@@ -16,10 +16,7 @@
package cn.hutool.v7.extra.mq.engine.mica;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import cn.hutool.v7.extra.mq.*;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.core.client.MqttClient;
@@ -78,7 +75,7 @@ public class MicaMqttConsumer implements Consumer {
// 订阅topic并设置消息处理器
this.mqttClient.subscribe(this.topic, mqttQoS, (context, topic, message, payload) -> {
messageHandler.handle(new MicaMqttMessage(topic, payload));
messageHandler.handle(new SimpleMessage(topic, payload));
});
}
@@ -95,13 +92,4 @@ public class MicaMqttConsumer implements Consumer {
}
}
}
/**
* MQTT消息包装类
*
* @author Looly
* @since 7.0.0
*/
private record MicaMqttMessage(String topic, byte[] content) implements Message {
}
}

View File

@@ -16,13 +16,12 @@
package cn.hutool.v7.extra.mq.engine.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import cn.hutool.v7.extra.mq.SimpleMessage;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Map;
@@ -62,22 +61,10 @@ public class RabbitMQConsumer implements Consumer {
public void subscribe(final MessageHandler messageHandler) {
queueDeclare(false, false, false, null);
final DeliverCallback deliverCallback = (consumerTag, delivery) ->
messageHandler.handle(new Message() {
@Override
public String topic() {
return consumerTag;
}
@Override
public byte[] content() {
return delivery.getBody();
}
});
try {
this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> {
});
this.channel.basicConsume(this.topic, true,
(consumerTag, delivery) -> messageHandler.handle(new SimpleMessage(consumerTag, delivery.getBody())),
consumerTag -> {});
} catch (final IOException e) {
throw new MQException(e);
}

View File

@@ -16,15 +16,12 @@
package cn.hutool.v7.extra.mq.engine.rocketmq;
import cn.hutool.v7.extra.mq.*;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQException;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.MessageHandler;
import java.io.IOException;
@@ -66,7 +63,7 @@ public class RocketMQConsumer implements Consumer {
public void subscribe(final MessageHandler messageHandler) {
this.consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (final MessageExt msg : msgs) {
messageHandler.handle(new RocketMQMessage(msg));
messageHandler.handle(new SimpleMessage(msg.getTopic(), msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
@@ -78,29 +75,4 @@ public class RocketMQConsumer implements Consumer {
this.consumer.shutdown();
}
}
/**
* RocketMQ消息包装
*
* @author Looly
* @since 6.0.0
*/
private static class RocketMQMessage implements Message {
private final MessageExt messageExt;
private RocketMQMessage(final MessageExt messageExt) {
this.messageExt = messageExt;
}
@Override
public String topic() {
return messageExt.getTopic();
}
@Override
public byte[] content() {
return messageExt.getBody();
}
}
}

View File

@@ -16,10 +16,7 @@
package cn.hutool.v7.extra.mq.engine;
import cn.hutool.v7.extra.mq.Consumer;
import cn.hutool.v7.extra.mq.MQConfig;
import cn.hutool.v7.extra.mq.Message;
import cn.hutool.v7.extra.mq.Producer;
import cn.hutool.v7.extra.mq.*;
import cn.hutool.v7.extra.mq.engine.mica.MicaMqttEngine;
import org.junit.jupiter.api.Test;
@@ -33,60 +30,40 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
*/
class MicaMqttEngineTest {
@Test
void testMicaMqttEngineCreation() {
// 创建MQTT引擎
final MicaMqttEngine engine = new MicaMqttEngine();
@Test
void testMicaMqttEngineCreation() {
// 创建MQTT引擎
final MicaMqttEngine engine = new MicaMqttEngine();
// 初始化配置
final MQConfig config = MQConfig.of("tcp://localhost:1883");
final MQEngine initializedEngine = engine.init(config);
// 初始化配置
final MQConfig config = MQConfig.of("tcp://localhost:1883");
final MQEngine initializedEngine = engine.init(config);
// 验证引擎初始化成功
assertNotNull(initializedEngine);
assertNotNull(initializedEngine.getProducer());
assertNotNull(initializedEngine.getConsumer());
}
// 验证引擎初始化成功
assertNotNull(initializedEngine);
assertNotNull(initializedEngine.getProducer());
assertNotNull(initializedEngine.getConsumer());
}
@Test
void testMicaMqttProducer() {
final MicaMqttEngine engine = new MicaMqttEngine();
final MQConfig config = MQConfig.of("tcp://localhost:1883");
engine.init(config);
@Test
void testMicaMqttProducer() {
final MicaMqttEngine engine = new MicaMqttEngine();
final MQConfig config = MQConfig.of("tcp://localhost:1883");
engine.init(config);
final Producer producer = engine.getProducer();
assertNotNull(producer);
// 这里不实际发送消息因为需要真实的MQTT broker
}
final Producer producer = engine.getProducer();
assertNotNull(producer);
// 这里不实际发送消息因为需要真实的MQTT broker
}
@Test
void testMicaMqttConsumer() {
final MicaMqttEngine engine = new MicaMqttEngine();
final MQConfig config = MQConfig.of("tcp://localhost:1883");
engine.init(config);
@Test
void testMicaMqttConsumer() {
final MicaMqttEngine engine = new MicaMqttEngine();
final MQConfig config = MQConfig.of("tcp://localhost:1883");
engine.init(config);
final Consumer consumer = engine.getConsumer();
assertNotNull(consumer);
// 这里不实际订阅消息因为需要真实的MQTT broker
}
@Test
void testMessageInterfaceImplementation() {
// 测试消息接口的实现
final Message testMessage = new Message() {
@Override
public String topic() {
return "test/topic";
}
@Override
public byte[] content() {
return "Hello MQTT".getBytes();
}
};
assertNotNull(testMessage);
assertNotNull(testMessage.topic());
assertNotNull(testMessage.content());
}
final Consumer consumer = engine.getConsumer();
assertNotNull(consumer);
// 这里不实际订阅消息因为需要真实的MQTT broker
}
}