diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/SimpleMessage.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/SimpleMessage.java new file mode 100644 index 0000000000..b4e109cfaf --- /dev/null +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/SimpleMessage.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.extra.mq; + +/** + * 简单的消息封装 + * + * @param topic 主题 + * @param content 内容 + */ +public record SimpleMessage(String topic, byte[] content) implements Message {} diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java index 32c61c9930..6fe692d136 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/jms/JmsConsumer.java @@ -22,10 +22,7 @@ import cn.hutool.v7.extra.mq.Consumer; import cn.hutool.v7.extra.mq.MQException; import cn.hutool.v7.extra.mq.Message; import cn.hutool.v7.extra.mq.MessageHandler; -import jakarta.jms.BytesMessage; -import jakarta.jms.JMSException; -import jakarta.jms.MessageConsumer; -import jakarta.jms.TextMessage; +import jakarta.jms.*; import java.io.IOException; @@ -65,28 +62,7 @@ public class JmsConsumer implements Consumer { @Override public void subscribe(final MessageHandler messageHandler) { try { - this.consumer.setMessageListener(message -> messageHandler.handle(new Message() { - @Override - public String topic() { - return consumerGroup; - } - - @Override - public byte[] content() { - try { - if (message instanceof TextMessage) { - // TODO 考虑编码 - return ByteUtil.toUtf8Bytes(((TextMessage) message).getText()); - } else if (message instanceof BytesMessage) { - return new byte[(int) ((BytesMessage) message).getBodyLength()]; - } else { - throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName()); - } - } catch (final JMSException e) { - throw new MQException(e); - } - } - })); + this.consumer.setMessageListener(message -> messageHandler.handle(new JmsMessage(consumerGroup, message))); } catch (final JMSException e) { throw new MQException(e); } @@ -96,4 +72,28 @@ public class JmsConsumer implements Consumer { public void close() throws IOException { IoUtil.closeQuietly(this.consumer); } + + /** + * JMS消息封装 + * + * @param topic JMS主题,即consumerGroup + * @param jmsMessage + */ + private record JmsMessage(String topic, jakarta.jms.Message jmsMessage) implements Message { + @Override + public byte[] content() { + try { + if (jmsMessage instanceof TextMessage) { + // TODO 考虑编码 + return ByteUtil.toUtf8Bytes(((TextMessage) jmsMessage).getText()); + } else if (jmsMessage instanceof BytesMessage) { + return new byte[(int) ((BytesMessage) jmsMessage).getBodyLength()]; + } else { + throw new IllegalArgumentException("Unsupported message type: " + jmsMessage.getClass().getName()); + } + } catch (final JMSException e) { + throw new MQException(e); + } + } + } } diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/kafka/KafkaConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/kafka/KafkaConsumer.java index fdf3ad61b5..9b30ec10d7 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/kafka/KafkaConsumer.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/kafka/KafkaConsumer.java @@ -16,12 +16,12 @@ package cn.hutool.v7.extra.mq.engine.kafka; -import org.apache.kafka.clients.consumer.ConsumerRecord; import cn.hutool.v7.core.collection.ListUtil; import cn.hutool.v7.core.io.IoUtil; import cn.hutool.v7.extra.mq.Consumer; -import cn.hutool.v7.extra.mq.Message; import cn.hutool.v7.extra.mq.MessageHandler; +import cn.hutool.v7.extra.mq.SimpleMessage; +import org.apache.kafka.clients.consumer.ConsumerRecord; import java.io.IOException; import java.time.Duration; @@ -73,7 +73,7 @@ public class KafkaConsumer implements Consumer { * @param topicPattern topic{@link Pattern} * @return this */ - public KafkaConsumer setTopicPattern(final Pattern topicPattern){ + public KafkaConsumer setTopicPattern(final Pattern topicPattern) { this.consumer.subscribe(topicPattern); return this; } @@ -81,7 +81,7 @@ public class KafkaConsumer implements Consumer { @Override public void subscribe(final MessageHandler messageHandler) { for (final ConsumerRecord record : this.consumer.poll(Duration.ofMillis(3000))) { - messageHandler.handle(new ConsumerRecordMessage(record)); + messageHandler.handle(new SimpleMessage(record.topic(), record.value())); } } @@ -89,33 +89,4 @@ public class KafkaConsumer implements Consumer { public void close() throws IOException { IoUtil.nullSafeClose(this.consumer); } - - /** - * 消费者记录包装为消息 - * - * @author looly - */ - private static class ConsumerRecordMessage implements Message { - - private final ConsumerRecord record; - - /** - * 构造 - * - * @param record {@link ConsumerRecord} - */ - private ConsumerRecordMessage(final ConsumerRecord record) { - this.record = record; - } - - @Override - public String topic() { - return record.topic(); - } - - @Override - public byte[] content() { - return record.value(); - } - } } diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java index 6a8feb539f..4dfdd6ff6e 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/mica/MicaMqttConsumer.java @@ -16,10 +16,7 @@ package cn.hutool.v7.extra.mq.engine.mica; -import cn.hutool.v7.extra.mq.Consumer; -import cn.hutool.v7.extra.mq.MQException; -import cn.hutool.v7.extra.mq.Message; -import cn.hutool.v7.extra.mq.MessageHandler; +import cn.hutool.v7.extra.mq.*; import org.dromara.mica.mqtt.codec.MqttQoS; import org.dromara.mica.mqtt.core.client.MqttClient; @@ -78,7 +75,7 @@ public class MicaMqttConsumer implements Consumer { // 订阅topic并设置消息处理器 this.mqttClient.subscribe(this.topic, mqttQoS, (context, topic, message, payload) -> { - messageHandler.handle(new MicaMqttMessage(topic, payload)); + messageHandler.handle(new SimpleMessage(topic, payload)); }); } @@ -95,13 +92,4 @@ public class MicaMqttConsumer implements Consumer { } } } - - /** - * MQTT消息包装类 - * - * @author Looly - * @since 7.0.0 - */ - private record MicaMqttMessage(String topic, byte[] content) implements Message { - } } diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rabbitmq/RabbitMQConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rabbitmq/RabbitMQConsumer.java index aac93fe55b..91eb6ee362 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rabbitmq/RabbitMQConsumer.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rabbitmq/RabbitMQConsumer.java @@ -16,13 +16,12 @@ package cn.hutool.v7.extra.mq.engine.rabbitmq; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.DeliverCallback; import cn.hutool.v7.core.io.IoUtil; import cn.hutool.v7.extra.mq.Consumer; import cn.hutool.v7.extra.mq.MQException; -import cn.hutool.v7.extra.mq.Message; import cn.hutool.v7.extra.mq.MessageHandler; +import cn.hutool.v7.extra.mq.SimpleMessage; +import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.Map; @@ -62,22 +61,10 @@ public class RabbitMQConsumer implements Consumer { public void subscribe(final MessageHandler messageHandler) { queueDeclare(false, false, false, null); - final DeliverCallback deliverCallback = (consumerTag, delivery) -> - messageHandler.handle(new Message() { - @Override - public String topic() { - return consumerTag; - } - - @Override - public byte[] content() { - return delivery.getBody(); - } - }); - try { - this.channel.basicConsume(this.topic, true, deliverCallback, consumerTag -> { - }); + this.channel.basicConsume(this.topic, true, + (consumerTag, delivery) -> messageHandler.handle(new SimpleMessage(consumerTag, delivery.getBody())), + consumerTag -> {}); } catch (final IOException e) { throw new MQException(e); } diff --git a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rocketmq/RocketMQConsumer.java b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rocketmq/RocketMQConsumer.java index aec6d6bbcc..75d56c4678 100644 --- a/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rocketmq/RocketMQConsumer.java +++ b/hutool-extra/src/main/java/cn/hutool/v7/extra/mq/engine/rocketmq/RocketMQConsumer.java @@ -16,15 +16,12 @@ package cn.hutool.v7.extra.mq.engine.rocketmq; +import cn.hutool.v7.extra.mq.*; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; -import cn.hutool.v7.extra.mq.Consumer; -import cn.hutool.v7.extra.mq.MQException; -import cn.hutool.v7.extra.mq.Message; -import cn.hutool.v7.extra.mq.MessageHandler; import java.io.IOException; @@ -66,7 +63,7 @@ public class RocketMQConsumer implements Consumer { public void subscribe(final MessageHandler messageHandler) { this.consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (final MessageExt msg : msgs) { - messageHandler.handle(new RocketMQMessage(msg)); + messageHandler.handle(new SimpleMessage(msg.getTopic(), msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); @@ -78,29 +75,4 @@ public class RocketMQConsumer implements Consumer { this.consumer.shutdown(); } } - - /** - * RocketMQ消息包装 - * - * @author Looly - * @since 6.0.0 - */ - private static class RocketMQMessage implements Message { - private final MessageExt messageExt; - - private RocketMQMessage(final MessageExt messageExt) { - this.messageExt = messageExt; - } - - - @Override - public String topic() { - return messageExt.getTopic(); - } - - @Override - public byte[] content() { - return messageExt.getBody(); - } - } } diff --git a/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java b/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java index 76c6955c38..2f5c5f7ef0 100644 --- a/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java +++ b/hutool-extra/src/test/java/cn/hutool/v7/extra/mq/engine/MicaMqttEngineTest.java @@ -16,10 +16,7 @@ package cn.hutool.v7.extra.mq.engine; -import cn.hutool.v7.extra.mq.Consumer; -import cn.hutool.v7.extra.mq.MQConfig; -import cn.hutool.v7.extra.mq.Message; -import cn.hutool.v7.extra.mq.Producer; +import cn.hutool.v7.extra.mq.*; import cn.hutool.v7.extra.mq.engine.mica.MicaMqttEngine; import org.junit.jupiter.api.Test; @@ -33,60 +30,40 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; */ class MicaMqttEngineTest { - @Test - void testMicaMqttEngineCreation() { - // 创建MQTT引擎 - final MicaMqttEngine engine = new MicaMqttEngine(); + @Test + void testMicaMqttEngineCreation() { + // 创建MQTT引擎 + final MicaMqttEngine engine = new MicaMqttEngine(); - // 初始化配置 - final MQConfig config = MQConfig.of("tcp://localhost:1883"); - final MQEngine initializedEngine = engine.init(config); + // 初始化配置 + final MQConfig config = MQConfig.of("tcp://localhost:1883"); + final MQEngine initializedEngine = engine.init(config); - // 验证引擎初始化成功 - assertNotNull(initializedEngine); - assertNotNull(initializedEngine.getProducer()); - assertNotNull(initializedEngine.getConsumer()); - } + // 验证引擎初始化成功 + assertNotNull(initializedEngine); + assertNotNull(initializedEngine.getProducer()); + assertNotNull(initializedEngine.getConsumer()); + } - @Test - void testMicaMqttProducer() { - final MicaMqttEngine engine = new MicaMqttEngine(); - final MQConfig config = MQConfig.of("tcp://localhost:1883"); - engine.init(config); + @Test + void testMicaMqttProducer() { + final MicaMqttEngine engine = new MicaMqttEngine(); + final MQConfig config = MQConfig.of("tcp://localhost:1883"); + engine.init(config); - final Producer producer = engine.getProducer(); - assertNotNull(producer); - // 这里不实际发送消息,因为需要真实的MQTT broker - } + final Producer producer = engine.getProducer(); + assertNotNull(producer); + // 这里不实际发送消息,因为需要真实的MQTT broker + } - @Test - void testMicaMqttConsumer() { - final MicaMqttEngine engine = new MicaMqttEngine(); - final MQConfig config = MQConfig.of("tcp://localhost:1883"); - engine.init(config); + @Test + void testMicaMqttConsumer() { + final MicaMqttEngine engine = new MicaMqttEngine(); + final MQConfig config = MQConfig.of("tcp://localhost:1883"); + engine.init(config); - final Consumer consumer = engine.getConsumer(); - assertNotNull(consumer); - // 这里不实际订阅消息,因为需要真实的MQTT broker - } - - @Test - void testMessageInterfaceImplementation() { - // 测试消息接口的实现 - final Message testMessage = new Message() { - @Override - public String topic() { - return "test/topic"; - } - - @Override - public byte[] content() { - return "Hello MQTT".getBytes(); - } - }; - - assertNotNull(testMessage); - assertNotNull(testMessage.topic()); - assertNotNull(testMessage.content()); - } + final Consumer consumer = engine.getConsumer(); + assertNotNull(consumer); + // 这里不实际订阅消息,因为需要真实的MQTT broker + } }