diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/MsgDecoder.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/MsgDecoder.java similarity index 82% rename from hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/MsgDecoder.java rename to hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/MsgDecoder.java index b77f760c76..cab9b7df15 100644 --- a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/MsgDecoder.java +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/MsgDecoder.java @@ -14,12 +14,13 @@ * limitations under the License. */ -package cn.hutool.v7.socket.protocol; +package cn.hutool.v7.socket.aio.protocol; + +import cn.hutool.v7.socket.SocketRuntimeException; +import cn.hutool.v7.socket.aio.AioSession; import java.nio.ByteBuffer; -import cn.hutool.v7.socket.aio.AioSession; - /** * 消息解码器 * @@ -34,6 +35,7 @@ public interface MsgDecoder { * @param session 本次需要解码的session * @param readBuffer 待处理的读buffer * @return 本次解码成功后封装的业务消息对象, 返回null则表示解码未完成 + * @throws SocketRuntimeException 解码失败时抛出此异常 */ - T decode(AioSession session, ByteBuffer readBuffer); + T decode(AioSession session, ByteBuffer readBuffer) throws SocketRuntimeException; } diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/MsgEncoder.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/MsgEncoder.java similarity index 77% rename from hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/MsgEncoder.java rename to hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/MsgEncoder.java index ca960be04d..475cf89073 100644 --- a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/MsgEncoder.java +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/MsgEncoder.java @@ -14,26 +14,27 @@ * limitations under the License. */ -package cn.hutool.v7.socket.protocol; +package cn.hutool.v7.socket.aio.protocol; + +import cn.hutool.v7.socket.SocketRuntimeException; +import cn.hutool.v7.socket.aio.AioSession; import java.nio.ByteBuffer; -import cn.hutool.v7.socket.aio.AioSession; - /** * 消息编码器 * - * @author Looly - * * @param 编码前后的数据类型 + * @author Looly */ public interface MsgEncoder { /** * 编码数据用于写出 * - * @param session 本次需要解码的session + * @param session 本次需要解码的session * @param writeBuffer 待处理的读buffer - * @param data 写出的数据 + * @param data 写出的数据 + * @throws SocketRuntimeException 编码失败时抛出此异常 */ - void encode(AioSession session, ByteBuffer writeBuffer, T data); + void encode(AioSession session, ByteBuffer writeBuffer, T data) throws SocketRuntimeException; } diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/Protocol.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/Protocol.java similarity index 95% rename from hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/Protocol.java rename to hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/Protocol.java index 924f3b6a3a..eac154af53 100644 --- a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/Protocol.java +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/Protocol.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package cn.hutool.v7.socket.protocol; +package cn.hutool.v7.socket.aio.protocol; /** * 协议接口
diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/package-info.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/package-info.java similarity index 94% rename from hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/package-info.java rename to hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/package-info.java index 06870340af..52c53dd781 100644 --- a/hutool-socket/src/main/java/cn/hutool/v7/socket/protocol/package-info.java +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/aio/protocol/package-info.java @@ -20,4 +20,4 @@ * @author Looly * */ -package cn.hutool.v7.socket.protocol; +package cn.hutool.v7.socket.aio.protocol; diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/PacketBuilder.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/PacketBuilder.java new file mode 100644 index 0000000000..889d040752 --- /dev/null +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/PacketBuilder.java @@ -0,0 +1,228 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.socket.udp; + +import cn.hutool.v7.core.array.ArrayUtil; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** + * 安全的 UDP 包动态构建器(自动扩容) + * + *

该类提供了链式 API 用于构建二进制数据包,支持多种数据类型的写入操作。 + * 内部采用动态扩容机制,初始容量为 64 字节,当空间不足时会自动扩容。

+ * + *

使用示例:

+ *
{@code
+ * byte[] packet = new PacketBuilder()
+ *     .writeByte((byte) 0x01)
+ *     .writeInt(12345)
+ *     .writeString("Hello World")
+ *     .build();
+ * }
+ * + *

特性:

+ *
    + *
  • 支持设置字节序(大端/小端)
  • + *
  • 链式调用设计
  • + *
  • 自动内存管理(扩容和缩容)
  • + *
  • 空值安全处理
  • + *
  • 支持基本数据类型和字符串写入
  • + *
+ * + * @author Looly + * @since 7.0.0 + */ +public class PacketBuilder { + + /** + * 数据缓冲区,初始容量为 64 字节 + */ + private byte[] buffer = new byte[64]; // initial capacity + + /** + * 当前写入位置指针 + */ + private int position = 0; + + /** + * 字节序,默认为大端字节序 + */ + private ByteOrder order = ByteOrder.BIG_ENDIAN; + + /** + * 设置字节序 + * + * @param order 字节序,支持 {@link ByteOrder#BIG_ENDIAN} 或 {@link ByteOrder#LITTLE_ENDIAN} + * @return this,支持链式调用 + */ + public PacketBuilder order(final ByteOrder order) { + this.order = order; + return this; + } + + /** + * 写入一个字节 + * + * @param b 要写入的字节值 + * @return this,支持链式调用 + */ + public PacketBuilder writeByte(final byte b) { + ensureCapacity(1); + buffer[position++] = b; + return this; + } + + /** + * 写入一个 short 值(2字节) + * + * @param value 要写入的 short 值 + * @return this,支持链式调用 + */ + public PacketBuilder writeShort(final short value) { + ensureCapacity(2); + ByteBuffer.wrap(buffer, position, 2).order(order).putShort(value); + position += 2; + return this; + } + + /** + * 写入一个 int 值(4字节) + * + * @param value 要写入的 int 值 + * @return this,支持链式调用 + */ + public PacketBuilder writeInt(final int value) { + ensureCapacity(4); + ByteBuffer.wrap(buffer, position, 4).order(order).putInt(value); + position += 4; + return this; + } + + /** + * 写入一个 long 值(8字节) + * + * @param value 要写入的 long 值 + * @return this,支持链式调用 + */ + public PacketBuilder writeLong(final long value) { + ensureCapacity(8); + ByteBuffer.wrap(buffer, position, 8).order(order).putLong(value); + position += 8; + return this; + } + + /** + * 写入一个 float 值(4字节) + * + * @param value 要写入的 float 值 + * @return this,支持链式调用 + */ + public PacketBuilder writeFloat(final float value) { + ensureCapacity(4); + ByteBuffer.wrap(buffer, position, 4).order(order).putFloat(value); + position += 4; + return this; + } + + /** + * 写入一个 double 值(8字节) + * + * @param value 要写入的 double 值 + * @return this,支持链式调用 + */ + public PacketBuilder writeDouble(final double value) { + ensureCapacity(8); + ByteBuffer.wrap(buffer, position, 8).order(order).putDouble(value); + position += 8; + return this; + } + + /** + * 写入字节数组 + * + * @param bytes 要写入的字节数组,如果为 null 或空数组则不进行任何操作 + * @return this,支持链式调用 + */ + public PacketBuilder writeBytes(final byte[] bytes) { + if (bytes == null || bytes.length == 0) return this; + ensureCapacity(bytes.length); + System.arraycopy(bytes, 0, buffer, position, bytes.length); + position += bytes.length; + return this; + } + + /** + * 写入字符串,使用指定字符集编码 + * + * @param str 要写入的字符串,如果为 null 则写入空字符串 + * @param charset 字符集 + * @return this,支持链式调用 + */ + public PacketBuilder writeString(String str, final Charset charset) { + if (str == null) str = ""; + return writeBytes(str.getBytes(charset)); + } + + /** + * 写入字符串,使用 UTF-8 编码 + * + * @param str 要写入的字符串,如果为 null 则写入空字符串 + * @return this,支持链式调用 + */ + public PacketBuilder writeString(final String str) { + return writeString(str, StandardCharsets.UTF_8); + } + + /** + * 构建最终的字节数组 + * + *

如果缓冲区已满,直接返回原数组;否则返回一个大小为当前写入位置的新数组

+ * + * @return 构建完成的字节数组 + */ + public byte[] build() { + return position == buffer.length ? buffer : ArrayUtil.resize(buffer, position); + } + + /** + * 获取当前已写入的数据长度 + * + * @return 当前写入的字节数 + */ + public int length() { + return position; + } + + /** + * 确保缓冲区有足够容量 + * + *

当所需空间超过当前容量时,按照两倍扩容或所需大小的较大值进行扩容

+ * + * @param needed 需要的额外字节数 + */ + private void ensureCapacity(final int needed) { + final int required = position + needed; + if (required > buffer.length) { + final int newCap = Math.max(buffer.length << 1, required); + buffer = ArrayUtil.resize(buffer, newCap); + } + } +} diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/UdpSession.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/UdpSession.java new file mode 100644 index 0000000000..47bfb2d418 --- /dev/null +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/UdpSession.java @@ -0,0 +1,214 @@ +package cn.hutool.v7.socket.udp; + +import cn.hutool.v7.core.codec.binary.HexUtil; +import cn.hutool.v7.core.io.IORuntimeException; +import cn.hutool.v7.core.io.IoUtil; +import cn.hutool.v7.core.lang.Assert; +import cn.hutool.v7.core.lang.Opt; +import cn.hutool.v7.log.Log; +import cn.hutool.v7.socket.udp.protocol.UdpDecoder; +import cn.hutool.v7.socket.udp.protocol.UdpEncoder; + +import java.io.Closeable; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.util.concurrent.*; +import java.util.function.Consumer; + +/** + * UDP会话 + * + * @param 消息类型 + */ +public class UdpSession implements Closeable { + private static final Log log = Log.get(); + + private final DatagramSocket socket; + private final UdpEncoder encoder; + private final UdpDecoder decoder; + + // ====== 线程 ====== + private ExecutorService executor; // 接收循环 + + // ====== 回调 ====== + private Consumer msgHandler; + private Consumer errorHandler; + + /** + * 缓存大小 + */ + private int bufferSize; + + /** + * 构造 + * + * @param socket UDP socket + * @param encoder 编码器,作为UDP服务时可以为{@code null} + * @param decoder 解码器,作为UDP客户端时可以为{@code null} + */ + public UdpSession(final DatagramSocket socket, final UdpEncoder encoder, final UdpDecoder decoder) { + this.socket = Assert.notNull(socket); + this.encoder = encoder; + this.decoder = decoder; + bufferSize = IoUtil.DEFAULT_LARGE_BUFFER_SIZE; + } + + /** + * 设置接收到的UDP消息的处理逻辑 + * + * @param msgHandler 接收到的UDP消息的处理逻辑 + * @return this + */ + public UdpSession setMsgHandler(final Consumer msgHandler) { + this.msgHandler = msgHandler; + return this; + } + + /** + * 设置异常处理逻辑 + * + * @param errorHandler 异常处理逻辑,{@code null}表示不处理 + * @return this + */ + public UdpSession setErrorHandler(final Consumer errorHandler) { + this.errorHandler = errorHandler; + return this; + } + + /** + * 设置缓存大小 + * + * @param bufferSize 缓存大小 + * @return this + */ + public UdpSession setBufferSize(final int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + /** + * 发送数据 + * + * @param data 发送的数据包 + * @throws IORuntimeException IO异常 + */ + public void send(final T data) throws IORuntimeException { + Assert.notNull(encoder, "Encoder can not be null when send data"); + final byte[] payload = encoder.encode(data); + final DatagramPacket packet = new DatagramPacket(payload, payload.length); + try { + socket.send(packet); + } catch (final IOException e) { + throw new IORuntimeException(e); + } + } + + /** + * 发送心跳包(用户实现 heartbeat 消息类型) + * + * @param heartbeatMsg 心跳消息 + * @throws IORuntimeException IO异常 + */ + public void sendHeartbeat(final T heartbeatMsg) throws IORuntimeException { + send(heartbeatMsg); + } + + /** + * 启动定时心跳(需配合 idleTimeout 使用) + * + * @param heartbeatMsg 心跳消息 + * @param interval 间隔时间 + * @param scheduler 定时线程 + * @return 定时任务 + */ + public ScheduledFuture scheduleHeartbeat(final T heartbeatMsg, final long interval, final ScheduledExecutorService scheduler) { + return scheduler.scheduleAtFixedRate(() -> { + try { + sendHeartbeat(heartbeatMsg); + } catch (final Exception e) { + log.warn("Send heartbeat failed", e); + safeInvoke(() -> errorHandler.accept(e)); + } + }, interval, interval, TimeUnit.MILLISECONDS); + } + + /** + * Session是否保持连接 + * + * @return 是否保持连接 + */ + public boolean isOpen() { + return null != socket && !socket.isClosed(); + } + + @Override + public void close() { + IoUtil.closeQuietly(socket); + if (null != this.executor) { + this.executor.shutdown(); + this.executor = null; + } + } + + /** + * 启动接收循环(非阻塞) + * + * @return this + */ + public UdpSession start() { + executor.submit(this::receiveLoop); + return this; + } + + private void receiveLoop() { + final byte[] buffer = new byte[bufferSize]; + final DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + + while (!Thread.currentThread().isInterrupted()) { + try { + socket.receive(packet); + final int len = packet.getLength(); + // 校验 & 解码 + if (decoder.getMinLength() > 0 && len < decoder.getMinLength()) { + log.warn("Packet too short: {} < {}", len, decoder.getMinLength()); + continue; + } + + final byte[] data = new byte[len]; + System.arraycopy(packet.getData(), packet.getOffset(), data, 0, len); + if (!decoder.isValid(data)) { + log.warn("Packet validation failed. Hex: {}", HexUtil.encodeStr(data)); + continue; + } + + onMessage(decoder.decode(data)); + } catch (final SocketException e) { + onError(e); + break; // socket closed + } catch (final Exception e) { + onError(e); + } finally { + packet.setLength(buffer.length); // reset for next receive + } + } + close(); + } + + private void onMessage(final T msg) { + safeInvoke(() -> Opt.of(this.msgHandler).ifPresent(c -> c.accept(msg))); + } + + private void onError(final Throwable e) { + safeInvoke(() -> Opt.of(this.errorHandler).ifPresent(c -> c.accept(e))); + } + + private void safeInvoke(final Runnable task) { + try { + executor.execute(task); + } catch (final RejectedExecutionException e) { + log.warn("Callback executor rejected task", e); + } + } +} diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/package-info.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/package-info.java new file mode 100644 index 0000000000..0a76835631 --- /dev/null +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/package-info.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2013-2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * UDP相关封装 + *
{@code
+ *         ┌─────────────┐          ┌─────────────┐
+ *         │ UdpSession  │          │             │
+ *         └──────┬──────┘          │ Application │
+ *                │                 │             │
+ *         encode │                 └──────┬──────┘
+ *      ┌─────────▼─────────┐              │ send(T)
+ *      │  UdpEncoder    │ ◄────────────┘
+ *      └─────────┬─────────┘
+ *                │ byte[]
+ *      ┌─────────▼─────────┐
+ *      │   DatagramSocket  │
+ *      └─────────┬─────────┘
+ *                │ byte[]
+ *      ┌─────────▼─────────┐     onMessage(T)
+ *      │  UdpDecoder    ├──────────────►
+ *      └───────────────────┘
+ * }
+ * + * @author Looly + * + */ +package cn.hutool.v7.socket.udp; diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/UdpDecoder.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/UdpDecoder.java new file mode 100644 index 0000000000..23836be1d8 --- /dev/null +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/UdpDecoder.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2026 Hutool Team. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.socket.udp.protocol; + +import cn.hutool.v7.socket.SocketRuntimeException; + +/** + * UDP 数据解码器:字节数组 → 业务对象 + * + * @param 业务对象类型 + * @author Looly + * @since 7.0.0 + */ +@FunctionalInterface +public interface UdpDecoder { + /** + * 将原始字节解码为业务对象 + * + * @param bytes 原始数据(非 null,长度 ≥ 0) + * @return 非 null 业务对象 + * @throws SocketRuntimeException 解码失败(校验失败、格式错误等) + */ + T decode(byte[] bytes) throws SocketRuntimeException; + + /** + * 可选:预校验(快速失败) + * + * @param bytes 原始数据(非 null,长度 ≥ 0) + * @return true 表示可能合法,false 表示一定非法 + */ + default boolean isValid(final byte[] bytes) { + return bytes != null && bytes.length > 0; + } + + /** + * 可选:最小合法长度 + * + * @return >0 表示需至少这么多字节;≤0 表示无限制 + */ + default int getMinLength() { + return 0; + } +} diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/UdpEncoder.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/UdpEncoder.java new file mode 100644 index 0000000000..68ddfd2a9a --- /dev/null +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/UdpEncoder.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2013-2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hutool.v7.socket.udp.protocol; + +import cn.hutool.v7.socket.SocketRuntimeException; + +/** + * UDP 数据编码器:业务对象 → 字节数组 + * + * @param 业务对象类型 + * @author Looly + * @since 7.0.0 + */ +@FunctionalInterface +public interface UdpEncoder { + /** + * 将业务对象编码为字节数组 + * + * @param data 业务数据(非 null) + * @return 非 null 字节数组,长度 ≥ 0 + * @throws SocketRuntimeException 编码失败(如字段缺失、溢出) + */ + byte[] encode(T data) throws SocketRuntimeException; +} diff --git a/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/package-info.java b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/package-info.java new file mode 100644 index 0000000000..e4e81e4c09 --- /dev/null +++ b/hutool-socket/src/main/java/cn/hutool/v7/socket/udp/protocol/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2013-2025 Hutool Team and hutool.cn + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * UDP消息相关封装 + * + * @author Looly + * + */ +package cn.hutool.v7.socket.udp.protocol;