This commit is contained in:
Looly
2026-01-02 22:50:35 +08:00
parent b629e8abdb
commit 270f5944de
16 changed files with 619 additions and 263 deletions

View File

@@ -19,11 +19,14 @@ package cn.hutool.v7.socket;
import cn.hutool.v7.core.io.IORuntimeException;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.core.thread.ThreadFactoryBuilder;
import cn.hutool.v7.socket.nio.Operation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.Selector;
import java.util.concurrent.ExecutionException;
/**
@@ -34,6 +37,28 @@ import java.util.concurrent.ExecutionException;
*/
public class ChannelUtil {
/**
* 注册通道的指定操作到指定Selector上
*
* @param selector Selector
* @param channel 通道
* @param ops 注册的通道监听(操作)类型
*/
public static void registerChannel(final Selector selector, final SelectableChannel channel, final Operation ops) {
if (channel == null) {
return;
}
try {
channel.configureBlocking(false);
// 注册通道
//noinspection MagicConstant
channel.register(selector, ops.getValue());
} catch (final IOException e) {
throw new IORuntimeException(e);
}
}
/**
* 创建{@link AsynchronousChannelGroup}
*

View File

@@ -18,6 +18,7 @@ package cn.hutool.v7.socket.nio;
import cn.hutool.v7.core.io.IORuntimeException;
import cn.hutool.v7.log.LogUtil;
import cn.hutool.v7.socket.ChannelUtil;
import java.io.IOException;
import java.nio.channels.CompletionHandler;

View File

@@ -1,53 +0,0 @@
/*
* Copyright (c) 2013-2025 Hutool Team and hutool.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.nio;
import cn.hutool.v7.core.io.IORuntimeException;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.Selector;
/**
* NIO工具类
*
* @since 5.4.0
*/
public class ChannelUtil {
/**
* 注册通道的指定操作到指定Selector上
*
* @param selector Selector
* @param channel 通道
* @param ops 注册的通道监听(操作)类型
*/
public static void registerChannel(final Selector selector, final SelectableChannel channel, final Operation ops) {
if (channel == null) {
return;
}
try {
channel.configureBlocking(false);
// 注册通道
//noinspection MagicConstant
channel.register(selector, ops.getValue());
} catch (final IOException e) {
throw new IORuntimeException(e);
}
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import java.net.SocketAddress;
/**
* UDP 上下文用于在UDP接收处理消息时传递远程地址信息。
*
* @param remoteAddress 远程地址
* @author looly
* @since 7.0.0
*/
public record UdpContext(SocketAddress remoteAddress) {
}

View File

@@ -1,20 +1,39 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import cn.hutool.v7.core.codec.binary.HexUtil;
import cn.hutool.v7.core.io.IORuntimeException;
import cn.hutool.v7.core.io.IoUtil;
import cn.hutool.v7.core.lang.Assert;
import cn.hutool.v7.core.lang.Opt;
import cn.hutool.v7.core.thread.ThreadUtil;
import cn.hutool.v7.log.Log;
import cn.hutool.v7.socket.SocketRuntimeException;
import cn.hutool.v7.socket.udp.protocol.UdpDecoder;
import cn.hutool.v7.socket.udp.protocol.UdpEncoder;
import java.io.Closeable;
import java.io.IOException;
import java.net.*;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
@@ -25,63 +44,15 @@ import java.util.function.Consumer;
public class UdpSession<T> implements Closeable {
private static final Log log = Log.get();
/**
* 创建UDP客户端会话
*
* @param host 远程主机地址
* @param port 端口号
* @param encoder 编码器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofClient(final String host, final int port, final UdpEncoder<T> encoder) {
final UdpSession<T> udpSession;
try {
udpSession = new UdpSession<>(new DatagramSocket(), encoder, null);
} catch (final SocketException e) {
throw new SocketRuntimeException(e);
}
return udpSession.setRemoteAddress(new InetSocketAddress(host, port));
}
/**
* 创建UDP服务端会话
*
* @param bindAddress 绑定地址和端口
* @param decoder 解码器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofServer(final SocketAddress bindAddress, final UdpDecoder<T> decoder) {
return ofServer(bindAddress, decoder, ThreadUtil.newExecutor(10));
}
/**
* 创建UDP服务端会话
*
* @param bindAddress 绑定地址和端口
* @param decoder 解码器
* @param executor 执行器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofServer(final SocketAddress bindAddress, final UdpDecoder<T> decoder, final ExecutorService executor) {
try {
return new UdpSession<>(new DatagramSocket(bindAddress), null, decoder)
.setExecutor(executor);
} catch (final SocketException e) {
throw new SocketRuntimeException(e);
}
}
private final DatagramSocket socket;
private final UdpEncoder<T> encoder;
private final UdpDecoder<T> decoder;
private volatile ExecutorService executor;
private volatile ScheduledExecutorService scheduler;
private volatile InetSocketAddress remoteAddress;
private volatile Consumer<T> msgHandler;
private volatile SocketAddress remoteAddress;
private volatile BiConsumer<T, UdpContext> msgHandler;
private volatile Consumer<Throwable> errorHandler;
/**
@@ -121,7 +92,7 @@ public class UdpSession<T> implements Closeable {
* @param remoteAddress 远程地址
* @return this
*/
public UdpSession<T> setRemoteAddress(final InetSocketAddress remoteAddress) {
public UdpSession<T> setRemoteAddress(final SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return this;
}
@@ -132,7 +103,7 @@ public class UdpSession<T> implements Closeable {
* @param msgHandler 接收到的UDP消息的处理逻辑
* @return this
*/
public UdpSession<T> setMsgHandler(final Consumer<T> msgHandler) {
public UdpSession<T> setMsgHandler(final BiConsumer<T, UdpContext> msgHandler) {
this.msgHandler = msgHandler;
return this;
}
@@ -166,6 +137,17 @@ public class UdpSession<T> implements Closeable {
* @throws IORuntimeException IO异常
*/
public void send(final T data) throws IORuntimeException {
send(data, this.remoteAddress);
}
/**
* 发送数据到指定地址
*
* @param data 发送的数据包
* @param remoteAddress 远程地址
* @throws IORuntimeException IO异常
*/
public void send(final T data, final SocketAddress remoteAddress) throws IORuntimeException {
Assert.notNull(encoder, "Encoder can not be null when send data");
final byte[] payload = encoder.encode(data);
final DatagramPacket packet = new DatagramPacket(payload, payload.length, remoteAddress);
@@ -187,7 +169,21 @@ public class UdpSession<T> implements Closeable {
}
/**
* 启动定时心跳(需配合 idleTimeout 使用
* 启动定时心跳任务,定时发送心跳包(用户实现 heartbeat 消息类型
*
* @param heartbeatMsg 心跳消息
* @param interval 间隔时间
* @return 定时任务
*/
public ScheduledFuture<?> scheduleHeartbeat(final T heartbeatMsg, final long interval) {
if (null == this.scheduler) {
this.scheduler = ThreadUtil.newScheduledExecutor(1);
}
return scheduleHeartbeat(heartbeatMsg, interval, this.scheduler);
}
/**
* 启动定时心跳任务,定时发送心跳包(用户实现 heartbeat 消息类型)
*
* @param heartbeatMsg 心跳消息
* @param interval 间隔时间
@@ -216,11 +212,14 @@ public class UdpSession<T> implements Closeable {
@Override
public void close() {
IoUtil.closeQuietly(socket);
if (null != this.executor) {
this.executor.shutdown();
this.executor = null;
}
if (null != this.scheduler) {
this.scheduler.shutdown();
}
IoUtil.closeQuietly(socket);
}
/**
@@ -229,7 +228,8 @@ public class UdpSession<T> implements Closeable {
* @return this
*/
public UdpSession<T> start() {
if(null == executor) {
Assert.notNull(decoder, "Decoder can not be null when start receive loop");
if (null == executor) {
executor = ThreadUtil.newExecutor();
}
executor.submit(this::receiveLoop);
@@ -257,7 +257,7 @@ public class UdpSession<T> implements Closeable {
continue;
}
onMessage(decoder.decode(data));
onMessage(decoder.decode(data), new UdpContext(packet.getSocketAddress()));
} catch (final SocketException e) {
onError(e);
break; // socket closed
@@ -270,12 +270,12 @@ public class UdpSession<T> implements Closeable {
close();
}
private void onMessage(final T msg) {
safeInvoke(() -> Opt.of(this.msgHandler).ifPresent(c -> c.accept(msg)));
private void onMessage(final T msg, final UdpContext context) {
safeInvoke(() -> Optional.of(this.msgHandler).ifPresent(c -> c.accept(msg, context)));
}
private void onError(final Throwable e) {
safeInvoke(() -> Opt.of(this.errorHandler).ifPresent(c -> c.accept(e)));
safeInvoke(() -> Optional.of(this.errorHandler).ifPresent(c -> c.accept(e)));
}
private void safeInvoke(final Runnable task) {

View File

@@ -0,0 +1,149 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import cn.hutool.v7.core.thread.ThreadUtil;
import cn.hutool.v7.socket.SocketRuntimeException;
import cn.hutool.v7.socket.udp.protocol.UdpDecoder;
import cn.hutool.v7.socket.udp.protocol.UdpEncoder;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.concurrent.ExecutorService;
/**
* UDP工具类用于创建UDP会话
*
* @author Looly
* @since 7.0.0
*/
public class UdpUtil {
// region ----- client
/**
* 创建UDP客户端消息发送器发送器没有状态没有连接不会接收消息只是单纯发送
*
* @param host 远程主机地址
* @param port 端口号
* @param encoder 编码器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofSender(final String host, final int port, final UdpEncoder<T> encoder) {
return ofSender(new InetSocketAddress(host, port), encoder);
}
/**
* 创建UDP客户端消息发送器发送器没有状态没有连接不会接收消息只是单纯发送
*
* @param remoteAddress 远程主机地址和端口
* @param encoder 编码器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofSender(final SocketAddress remoteAddress, final UdpEncoder<T> encoder) {
final UdpSession<T> udpSession;
try {
udpSession = new UdpSession<>(new DatagramSocket(), encoder, null);
} catch (final SocketException e) {
throw new SocketRuntimeException(e);
}
return udpSession.setRemoteAddress(remoteAddress);
}
/**
* 创建UDP客户端会话支持服务端的消息接收
*
* @param host 远程主机地址
* @param port 端口号
* @param encoder 编码器
* @param decoder 解码器,用于处理服务端返回的消息
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofClient(final String host, final int port, final UdpEncoder<T> encoder, final UdpDecoder<T> decoder) {
return ofClient(new InetSocketAddress(host, port), encoder, decoder);
}
/**
* 创建UDP客户端会话支持服务端的消息接收
*
* @param remoteAddress 远程主机地址和端口号
* @param encoder 编码器
* @param decoder 解码器,用于处理服务端返回的消息
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofClient(final SocketAddress remoteAddress, final UdpEncoder<T> encoder, final UdpDecoder<T> decoder) {
final UdpSession<T> udpSession;
try {
udpSession = new UdpSession<>(new DatagramSocket(), encoder, decoder);
} catch (final SocketException e) {
throw new SocketRuntimeException(e);
}
return udpSession.setRemoteAddress(remoteAddress);
}
// endregion
// region ----- server
/**
* 创建UDP服务端会话用于接收客户端消息而无需发送消息
*
* @param bindAddress 绑定地址和端口
* @param decoder 解码器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofServer(final SocketAddress bindAddress, final UdpDecoder<T> decoder) {
return ofServer(bindAddress, null, decoder);
}
/**
* 创建UDP服务端会话
*
* @param bindAddress 绑定地址和端口
* @param encoder 编码器
* @param decoder 解码器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofServer(final SocketAddress bindAddress, final UdpEncoder<T> encoder, final UdpDecoder<T> decoder) {
return ofServer(bindAddress, encoder, decoder, ThreadUtil.newExecutor(16));
}
/**
* 创建UDP服务端会话
*
* @param bindAddress 绑定地址和端口
* @param encoder 编码器
* @param decoder 解码器
* @param executor 执行器
* @param <T> 消息类型
* @return UDP会话
*/
public static <T> UdpSession<T> ofServer(final SocketAddress bindAddress, final UdpEncoder<T> encoder, final UdpDecoder<T> decoder, final ExecutorService executor) {
try {
return new UdpSession<>(new DatagramSocket(bindAddress), encoder, decoder)
.setExecutor(executor);
} catch (final SocketException e) {
throw new SocketRuntimeException(e);
}
}
// endregion
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import cn.hutool.v7.core.util.CharsetUtil;
import cn.hutool.v7.socket.udp.protocol.UdpDecoder;
import cn.hutool.v7.socket.udp.protocol.UdpEncoder;
import java.nio.charset.StandardCharsets;
public class StringUdpCodec implements UdpEncoder<String>, UdpDecoder<String> {
@Override
public byte[] encode(final String msg) {
return msg.getBytes(CharsetUtil.UTF_8);
}
@Override
public int getMinLength() {
return 1; // 至少 1 字节
}
@Override
public boolean isValid(final byte[] data) {
// 简单校验:非空即可;实际项目可加校验和、魔数等
return data != null && data.length > 0;
}
@Override
public String decode(final byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
}

View File

@@ -0,0 +1,40 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import cn.hutool.v7.core.lang.Console;
import cn.hutool.v7.core.thread.ThreadUtil;
public class UdpClientDemo {
public static void main(final String[] args) {
final UdpSession<String> client = UdpUtil.ofClient("127.0.0.1", 9999, new StringUdpCodec(), new StringUdpCodec());
client.setMsgHandler(((msg, context) -> Console.log("Received from server: " + msg)));
client.start();
// 模拟发几条消息
client.send("Hello from sender!");
client.send("How are you?");
// 启动心跳(每 3 秒一次)
final String heartbeatMsg = "PING";
client.scheduleHeartbeat(heartbeatMsg, 3000); // 3s
// 模拟接收消息
ThreadUtil.sleep(1000 * 30); // 等待一段时间,以便观察输出
client.close();
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import cn.hutool.v7.core.lang.Console;
import cn.hutool.v7.core.thread.ThreadUtil;
public class UdpSenderDemo {
public static void main(final String[] args) {
// 创建客户端会话,连接到 localhost:9999
final UdpSession<String> client = UdpUtil.ofSender("127.0.0.1", 9999, new StringUdpCodec());
Console.log("[Sender] UDP sender started. Sending messages...");
// 模拟发几条消息
client.send("Hello from sender!");
client.send("How are you?");
// 启动心跳(每 3 秒一次)
final String heartbeatMsg = "PING";
client.scheduleHeartbeat(heartbeatMsg, 3000); // 3s
// 保持运行
ThreadUtil.sleep(1000 * 30); // 30秒
// 关闭
client.close();
Console.log("[Sender] Closed.");
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2026 Hutool Team.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hutool.v7.socket.udp;
import cn.hutool.v7.core.lang.Console;
import cn.hutool.v7.core.thread.ThreadUtil;
import java.net.InetSocketAddress;
public class UdpServerDemo {
@SuppressWarnings("resource")
public static void main(final String[] args) {
// 创建服务端会话,绑定 9999 端口
final UdpSession<String> server = UdpUtil.ofServer(new InetSocketAddress(9999), new StringUdpCodec(), new StringUdpCodec());
// 设置消息处理器:回显客户端消息 + " [server echo]"
server.setMsgHandler((msg, context) -> {
Console.log("[Server] From [{}] received: {}", context.remoteAddress(), msg);
// 回发消息
server.send("Server received: " + msg, context.remoteAddress());
//UdpSession.ofSender(context.remoteAddress(), new StringUdpCodec()).send(msg + " [server echo]");
})
// 异常处理器
.setErrorHandler(Throwable::printStackTrace)
// 启动接收循环
.start();
Console.log("[Server] UDP server started on port 9999. Waiting for messages...");
// 保持主线程运行
ThreadUtil.waitForDie();
}
}

View File

@@ -33,6 +33,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.*;
@@ -91,13 +92,13 @@ class UdpSessionTest {
}
/**
* 测试 ofClient 工厂方法
* 测试 ofSender 工厂方法
*/
@SuppressWarnings("resource")
@Test
void testOfClient() {
void testOfSender() {
// 正常情况
final UdpSession<String> session = UdpSession.ofClient(TEST_HOST, TEST_PORT, new StringUdpEncoder());
final UdpSession<String> session = UdpUtil.ofSender(TEST_HOST, TEST_PORT, new StringUdpEncoder());
assertNotNull(session);
assertTrue(session.isOpen());
@@ -106,7 +107,7 @@ class UdpSessionTest {
// 测试异常情况 - 无效的端口号
assertThrows(IllegalArgumentException.class, () -> {
UdpSession.ofClient(TEST_HOST, -1, new StringUdpEncoder());
UdpUtil.ofSender(TEST_HOST, -1, new StringUdpEncoder());
});
}
@@ -118,14 +119,14 @@ class UdpSessionTest {
void testOfServer() {
// 正常情况
final InetSocketAddress bindAddress = new InetSocketAddress(TEST_HOST, TEST_PORT_SERVER);
final UdpSession<String> session = UdpSession.ofServer(bindAddress, new StringUdpDecoder());
final UdpSession<String> session = UdpUtil.ofServer(bindAddress, new StringUdpDecoder());
assertNotNull(session);
assertTrue(session.isOpen());
session.close();
// 测试异常情况 - 无效的绑定地址
assertThrows(SocketRuntimeException.class, () -> {
UdpSession.ofServer(new InetSocketAddress("invalid.host", 9999), new StringUdpDecoder());
UdpUtil.ofServer(new InetSocketAddress("invalid.host", 9999), new StringUdpDecoder());
});
}
@@ -167,8 +168,7 @@ class UdpSessionTest {
void testSetMsgHandler() {
final UdpSession<String> session = new UdpSession<>(testSocket, new StringUdpEncoder(), new StringUdpDecoder());
final Consumer<String> msgHandler = msg -> {
};
final BiConsumer<String, UdpContext> msgHandler = (msg, context) -> {};
final UdpSession<String> result = session.setMsgHandler(msgHandler);
assertEquals(session, result);
@@ -217,8 +217,8 @@ class UdpSessionTest {
* 测试 send 方法 - 正常情况
*/
@Test
void testSend() throws IOException {
final UdpSession<String> session = UdpSession.ofClient(TEST_HOST, TEST_PORT, new StringUdpEncoder());
void testSend() {
final UdpSession<String> session = UdpUtil.ofSender(TEST_HOST, TEST_PORT, new StringUdpEncoder());
// 正常发送
assertDoesNotThrow(() -> session.send("test message"));
@@ -250,8 +250,8 @@ class UdpSessionTest {
* 测试 sendHeartbeat 方法
*/
@Test
void testSendHeartbeat() throws IOException {
final UdpSession<String> session = UdpSession.ofClient(TEST_HOST, TEST_PORT, new StringUdpEncoder());
void testSendHeartbeat() {
final UdpSession<String> session = UdpUtil.ofSender(TEST_HOST, TEST_PORT, new StringUdpEncoder());
// 正常发送心跳
assertDoesNotThrow(() -> session.sendHeartbeat("heartbeat"));
@@ -265,7 +265,7 @@ class UdpSessionTest {
@Test
@Timeout(5)
void testScheduleHeartbeat() throws InterruptedException {
final UdpSession<String> session = UdpSession.ofClient(TEST_HOST, TEST_PORT, new StringUdpEncoder());
final UdpSession<String> session = UdpUtil.ofSender(TEST_HOST, TEST_PORT, new StringUdpEncoder());
final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
try {
@@ -319,15 +319,15 @@ class UdpSessionTest {
@SuppressWarnings("WriteOnlyObject")
@Test
@Timeout(5)
void testStart() throws InterruptedException, SocketException {
void testStart() throws InterruptedException {
// 创建服务端会话用于接收消息
final InetSocketAddress serverAddress = new InetSocketAddress(TEST_HOST, TEST_PORT_SERVER);
final UdpSession<String> serverSession = UdpSession.ofServer(serverAddress, new StringUdpDecoder());
final UdpSession<String> serverSession = UdpUtil.ofServer(serverAddress, new StringUdpDecoder());
final AtomicInteger messageCount = new AtomicInteger(0);
final AtomicReference<String> receivedMessage = new AtomicReference<>();
serverSession.setMsgHandler(msg -> {
serverSession.setMsgHandler((msg, context) -> {
messageCount.incrementAndGet();
receivedMessage.set(msg);
});
@@ -336,7 +336,7 @@ class UdpSessionTest {
serverSession.start();
// 创建客户端并发送消息
final UdpSession<String> clientSession = UdpSession.ofClient(TEST_HOST, TEST_PORT_SERVER, new StringUdpEncoder());
final UdpSession<String> clientSession = UdpUtil.ofSender(TEST_HOST, TEST_PORT_SERVER, new StringUdpEncoder());
clientSession.send("test message");
// 等待消息处理
@@ -400,18 +400,18 @@ class UdpSessionTest {
};
final InetSocketAddress serverAddress = new InetSocketAddress(TEST_HOST, TEST_PORT_SERVER);
final UdpSession<String> serverSession = UdpSession.ofServer(serverAddress, strictDecoder);
final UdpSession<String> serverSession = UdpUtil.ofServer(serverAddress, strictDecoder);
final AtomicInteger validMessageCount = new AtomicInteger(0);
final AtomicInteger invalidMessageCount = new AtomicInteger(0);
serverSession.setMsgHandler(msg -> validMessageCount.incrementAndGet());
serverSession.setMsgHandler((msg, context) -> validMessageCount.incrementAndGet());
// 启动接收循环
serverSession.start();
// 创建客户端
final UdpSession<String> clientSession = UdpSession.ofClient(TEST_HOST, TEST_PORT_SERVER, new StringUdpEncoder());
// 创建无状态客户端
final UdpSession<String> clientSession = UdpUtil.ofSender(TEST_HOST, TEST_PORT_SERVER, new StringUdpEncoder());
// 发送过短的消息(应该被过滤掉)
clientSession.send("123"); // 只有3个字节
@@ -458,7 +458,7 @@ class UdpSessionTest {
@Test
@Timeout(10)
void testConcurrentOperations() throws InterruptedException {
final UdpSession<String> session = UdpSession.ofClient(TEST_HOST, TEST_PORT, new StringUdpEncoder());
final UdpSession<String> session = UdpUtil.ofSender(TEST_HOST, TEST_PORT, new StringUdpEncoder());
final int threadCount = 5;
final CountDownLatch startLatch = new CountDownLatch(1);