跳转到内容

WebSocket 客户端

WebSocket 协议支持客户端与服务器之间的全双工通信,适用于实时聊天、在线游戏、股票行情推送、物联网设备控制等场景。Feat 提供了简洁的 WebSocketClient API 来实现 WebSocket 客户端功能。

以下示例展示 WebSocket 客户端的最基本用法:

import tech.smartboot.feat.core.client.WebSocketClient;
import tech.smartboot.feat.core.client.WebSocketListener;
import tech.smartboot.feat.core.client.WebSocketResponse;
public class WebSocketDemo {
public static void main(String[] args) {
// 1. 创建客户端实例
WebSocketClient client = new WebSocketClient("ws://localhost:8080/ws");
// 2. 建立连接
client.connect(new WebSocketListener() {
@Override
public void onOpen(WebSocketClient client, WebSocketResponse response) {
System.out.println("连接成功");
// 连接建立后发送消息
client.sendMessage("Hello, Feat!");
}
@Override
public void onMessage(WebSocketClient client, String message) {
System.out.println("收到消息: " + message);
}
@Override
public void onClose(WebSocketClient client, WebSocketResponse response, CloseReason reason) {
System.out.println("连接关闭: " + reason);
}
@Override
public void onError(WebSocketClient client, WebSocketResponse response, Throwable throwable) {
System.out.println("发生错误: " + throwable.getMessage());
}
});
}
}

核心步骤:

  1. 创建客户端 - 通过 URL 构造 WebSocketClient,支持 ws://wss:// 协议
  2. 实现监听器 - 实现 WebSocketListener 接口处理各类事件
  3. 建立连接 - 调用 connect() 方法发起连接

方法参数返回值说明
WebSocketClient(String url)url: WebSocket 服务器 URLWebSocketClient创建 WebSocket 客户端实例
options()-WebSocketClientOptions获取配置选项
connect(WebSocketListener)listener: 事件监听器void建立 WebSocket 连接
sendMessage(String)message: 文本消息void发送文本消息
sendBinary(byte[])data: 二进制数据void发送二进制消息
close()-void关闭 WebSocket 连接
close(int code, String reason)code: 关闭码
reason: 关闭原因
void关闭 WebSocket 连接并指定原因
方法参数说明
onOpen(WebSocketClient, WebSocketResponse)client: 客户端实例
response: 连接响应
连接建立成功时调用
onMessage(WebSocketClient, String)client: 客户端实例
message: 文本消息
收到文本消息时调用
onMessage(WebSocketClient, byte[])client: 客户端实例
data: 二进制数据
收到二进制消息时调用
onClose(WebSocketClient, WebSocketResponse, CloseReason)client: 客户端实例
response: 关闭响应
reason: 关闭原因
连接关闭时调用
onError(WebSocketClient, WebSocketResponse, Throwable)client: 客户端实例
response: 错误响应
throwable: 异常信息
发生错误时调用
方法参数返回值说明
connectTimeout(int)timeout: 超时时间(毫秒)WebSocketClientOptions设置连接超时时间
readBufferSize(int)size: 缓冲区大小(字节)WebSocketClientOptions设置读缓冲区大小
writeBufferSize(int)size: 缓冲区大小(字节)WebSocketClientOptions设置写缓冲区大小
debug(boolean)enabled: 是否启用WebSocketClientOptions开启调试日志
proxy(String, int, String, String)host: 代理主机
port: 代理端口
username: 用户名
password: 密码
WebSocketClientOptions配置代理服务器

WebSocket 支持文本消息和二进制消息:

// 发送简单文本消息
client.sendMessage("Hello, World!");
// 发送 JSON 消息
String jsonMessage = "{\"type\": \"chat\", \"content\": \"Hello Feat!\"}";
client.sendMessage(jsonMessage);
import java.nio.charset.StandardCharsets;
// 发送二进制数据
byte[] binaryData = "Binary Data".getBytes(StandardCharsets.UTF_8);
client.sendBinary(binaryData);
// 发送序列化对象
import com.fasterxml.jackson.databind.ObjectMapper;
ObjectMapper objectMapper = new ObjectMapper();
User user = new User("张三", 25);
byte[] userData = objectMapper.writeValueAsBytes(user);
client.sendBinary(userData);

通过 options() 方法获取配置对象,自定义连接参数:

WebSocketClient client = new WebSocketClient("wss://example.com/ws");
client.options()
.connectTimeout(5000) // 连接超时时间(毫秒)
.readBufferSize(8192) // 读缓冲区大小
.writeBufferSize(8192) // 写缓冲区大小
.debug(true) // 开启调试日志
.proxy("proxy.example.com", 8080, "user", "password"); // 代理配置
配置项类型默认值说明
connectTimeoutint30000连接超时时间(毫秒)
readBufferSizeint4096读缓冲区大小(字节)
writeBufferSizeint4096写缓冲区大小(字节)
debugbooleanfalse是否输出调试日志
proxyString, int, String, String-代理服务器配置(主机、端口、用户名、密码)

适用条件:需要实现用户之间的实时消息传递

示例

import tech.smartboot.feat.core.client.WebSocketClient;
import tech.smartboot.feat.core.client.WebSocketListener;
import tech.smartboot.feat.core.client.WebSocketResponse;
public class ChatClient {
public static void main(String[] args) {
WebSocketClient client = new WebSocketClient("ws://localhost:8080/chat");
client.connect(new WebSocketListener() {
@Override
public void onOpen(WebSocketClient client, WebSocketResponse response) {
System.out.println("连接到聊天服务器成功");
// 发送登录消息
client.sendMessage("{\"type\": \"login\", \"username\": \"user1\"}");
}
@Override
public void onMessage(WebSocketClient client, String message) {
System.out.println("收到消息: " + message);
// 处理收到的聊天消息
}
@Override
public void onClose(WebSocketClient client, WebSocketResponse response, CloseReason reason) {
System.out.println("聊天连接关闭: " + reason);
}
@Override
public void onError(WebSocketClient client, WebSocketResponse response, Throwable throwable) {
System.out.println("聊天连接错误: " + throwable.getMessage());
}
});
// 模拟发送聊天消息
try {
Thread.sleep(2000);
client.sendMessage("{\"type\": \"chat\", \"content\": \"大家好!\"}");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

适用条件:需要实时监控服务器或设备状态

示例

import tech.smartboot.feat.core.client.WebSocketClient;
import tech.smartboot.feat.core.client.WebSocketListener;
import tech.smartboot.feat.core.client.WebSocketResponse;
public class MonitoringClient {
public static void main(String[] args) {
WebSocketClient client = new WebSocketClient("ws://localhost:8080/monitoring");
client.connect(new WebSocketListener() {
@Override
public void onOpen(WebSocketClient client, WebSocketResponse response) {
System.out.println("连接到监控服务器成功");
// 订阅监控数据
client.sendMessage("{\"action\": \"subscribe\", \"metrics\": [\"cpu\", \"memory\", \"disk\"]}");
}
@Override
public void onMessage(WebSocketClient client, String message) {
System.out.println("收到监控数据: " + message);
// 解析和处理监控数据
}
@Override
public void onClose(WebSocketClient client, WebSocketResponse response, CloseReason reason) {
System.out.println("监控连接关闭: " + reason);
}
@Override
public void onError(WebSocketClient client, WebSocketResponse response, Throwable throwable) {
System.out.println("监控连接错误: " + throwable.getMessage());
}
});
}
}

适用条件:需要实现游戏中的实时通信,如玩家位置更新、聊天等

示例

import tech.smartboot.feat.core.client.WebSocketClient;
import tech.smartboot.feat.core.client.WebSocketListener;
import tech.smartboot.feat.core.client.WebSocketResponse;
public class GameClient {
public static void main(String[] args) {
WebSocketClient client = new WebSocketClient("ws://localhost:8080/game");
client.connect(new WebSocketListener() {
@Override
public void onOpen(WebSocketClient client, WebSocketResponse response) {
System.out.println("连接到游戏服务器成功");
// 发送加入游戏请求
client.sendMessage("{\"action\": \"join\", \"playerId\": \"player1\", \"gameId\": \"game1\"}");
}
@Override
public void onMessage(WebSocketClient client, String message) {
System.out.println("收到游戏消息: " + message);
// 处理游戏状态更新、玩家动作等
}
@Override
public void onClose(WebSocketClient client, WebSocketResponse response, CloseReason reason) {
System.out.println("游戏连接关闭: " + reason);
}
@Override
public void onError(WebSocketClient client, WebSocketResponse response, Throwable throwable) {
System.out.println("游戏连接错误: " + throwable.getMessage());
}
});
// 模拟发送玩家移动
try {
Thread.sleep(3000);
client.sendMessage("{\"action\": \"move\", \"x\": 100, \"y\": 200}");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

生产环境中网络可能不稳定,建议实现自动重连:

import tech.smartboot.feat.core.client.WebSocketClient;
import tech.smartboot.feat.core.client.WebSocketListener;
import tech.smartboot.feat.core.client.WebSocketResponse;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ReconnectDemo {
private static final String WS_URL = "ws://localhost:8080/ws";
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static WebSocketClient client;
private static int reconnectAttempts = 0;
private static final int MAX_RECONNECT_ATTEMPTS = 5;
public static void main(String[] args) {
connect();
// 程序退出时关闭资源
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
scheduler.shutdown();
if (client != null) {
client.close();
}
}));
}
private static void connect() {
client = new WebSocketClient(WS_URL);
client.options().debug(true);
client.connect(new WebSocketListener() {
@Override
public void onOpen(WebSocketClient client, WebSocketResponse response) {
System.out.println("连接成功");
// 重置重连计数
reconnectAttempts = 0;
}
@Override
public void onMessage(WebSocketClient client, String message) {
System.out.println("收到消息: " + message);
}
@Override
public void onClose(WebSocketClient client, WebSocketResponse response, CloseReason reason) {
System.out.println("连接关闭: " + reason);
scheduleReconnect();
}
@Override
public void onError(WebSocketClient client, WebSocketResponse response, Throwable throwable) {
System.out.println("连接错误: " + throwable.getMessage());
scheduleReconnect();
}
});
}
private static void scheduleReconnect() {
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
int delay = 5 + (reconnectAttempts * 2); // 指数退避
System.out.println("将在 " + delay + " 秒后尝试重连...");
scheduler.schedule(() -> {
reconnectAttempts++;
connect();
}, delay, TimeUnit.SECONDS);
} else {
System.err.println("重连失败,已达到最大尝试次数");
}
}
}

重连要点:

  • onCloseonError 回调中触发重连
  • 使用指数退避策略,避免频繁重试
  • 设置最大重连次数,防止无限重试
  • 程序退出时正确释放资源

  • 单例模式:每个 WebSocket 连接使用一个客户端实例
  • 资源释放:不需要连接时及时关闭
  • 重连机制:实现自动重连,处理网络波动
  • 消息格式:使用 JSON 等结构化格式
  • 消息类型:定义清晰的消息类型和处理逻辑
  • 错误处理:妥善处理消息解析错误
  • 缓冲区大小:根据实际数据大小调整缓冲区
  • 消息大小:避免发送过大的消息
  • 心跳机制:实现心跳检测,保持连接活跃
  • WSS 协议:生产环境使用 wss:// 加密连接
  • 认证机制:实现连接认证,防止未授权访问
  • 消息验证:验证消息来源和内容,防止恶意消息

问题可能原因解决方案
连接失败服务器未启动或地址错误检查服务器是否运行,确认 URL 正确
连接超时网络延迟或服务器响应慢增加连接超时时间,检查网络状况
消息发送失败连接已关闭或网络问题检查连接状态,实现重连机制
消息接收不到服务器未发送消息或路径错误检查服务器端代码,确认请求路径正确
WebSocket 握手失败服务器不支持 WebSocket确认服务器端已正确实现 WebSocket 支持
连接频繁断开网络不稳定或服务器配置问题实现重连机制,检查服务器心跳配置
  1. 开启调试模式

    client.options().debug(true);
  2. 检查服务器端 WebSocket 实现

    • 确保服务器端支持 WebSocket 协议
    • 检查 WebSocket 路径是否正确配置
  3. 使用浏览器测试

    • 在浏览器控制台中使用 JavaScript 测试 WebSocket 连接
    • 检查浏览器开发者工具中的网络请求
  4. 检查网络连接

    • 使用 ping 命令检查服务器是否可达
    • 使用 telnet 命令检查端口是否开放

import tech.smartboot.feat.core.client.WebSocketClient;
import tech.smartboot.feat.core.client.WebSocketListener;
import tech.smartboot.feat.core.client.WebSocketResponse;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class WebSocketCompleteDemo {
private static final String WS_URL = "ws://localhost:8080/ws";
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static WebSocketClient client;
private static int reconnectAttempts = 0;
private static final int MAX_RECONNECT_ATTEMPTS = 5;
public static void main(String[] args) {
connect();
// 程序退出时关闭资源
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
scheduler.shutdown();
if (client != null) {
client.close(1000, "Client shutdown");
System.out.println("WebSocket 连接已关闭");
}
}));
// 模拟发送消息
scheduler.schedule(() -> {
if (client != null) {
client.sendMessage("{\"type\": \"chat\", \"content\": \"Hello from Feat WebSocket client!\"}");
}
}, 2, TimeUnit.SECONDS);
}
private static void connect() {
client = new WebSocketClient(WS_URL);
client.options()
.connectTimeout(10000)
.readBufferSize(8192)
.writeBufferSize(8192)
.debug(true);
client.connect(new WebSocketListener() {
@Override
public void onOpen(WebSocketClient client, WebSocketResponse response) {
System.out.println("WebSocket 连接已建立");
// 发送认证消息
client.sendMessage("{\"type\": \"auth\", \"token\": \"user-token\"}");
// 重置重连计数
reconnectAttempts = 0;
}
@Override
public void onMessage(WebSocketClient client, String message) {
System.out.println("收到文本消息: " + message);
// 处理文本消息
}
@Override
public void onMessage(WebSocketClient client, byte[] data) {
System.out.println("收到二进制消息,长度: " + data.length);
// 处理二进制消息
}
@Override
public void onClose(WebSocketClient client, WebSocketResponse response, CloseReason reason) {
System.out.println("WebSocket 连接关闭: " + reason);
scheduleReconnect();
}
@Override
public void onError(WebSocketClient client, WebSocketResponse response, Throwable throwable) {
System.err.println("WebSocket 错误: " + throwable.getMessage());
throwable.printStackTrace();
scheduleReconnect();
}
});
}
private static void scheduleReconnect() {
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
int delay = 5 + (reconnectAttempts * 2);
System.out.println("将在 " + delay + " 秒后尝试重连...");
scheduler.schedule(() -> {
reconnectAttempts++;
connect();
}, delay, TimeUnit.SECONDS);
} else {
System.err.println("重连失败,已达到最大尝试次数");
}
}
}