SSE 客户端详解
Server-Sent Events (SSE) 是一种服务器向客户端推送实时更新的技术。与 WebSocket 不同,SSE 是单向通信,只能服务器向客户端推送数据,但使用更简单,因为它是基于 HTTP 协议的。
以下是一个简单的示例,展示了如何使用 Feat SSEClient 连接 SSE 服务器并处理消息:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;
public class SSEClientDemo { public static void main(String[] args) { // 创建 HttpClient 实例 HttpClient httpClient = Feat.httpClient("http://localhost:8080");
// 注册 SSE 处理器并提交请求 httpClient.get("/events") .onSSE(sseClient -> sseClient // 注册默认事件处理器 .onData(event -> { System.out.println("收到数据: " + event.getData()); }) // 注册连接成功回调 .onOpen(client -> { System.out.println("SSE 连接已建立"); })) // 提交请求 .submit(); }}API 接口说明
Section titled “API 接口说明”| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
HttpClient.get(String path) | path: 请求路径 | HttpRest | 创建 GET 请求 |
HttpRest.onSSE(Consumer<SseClient>) | callback: SSE 客户端回调 | HttpRest | 注册 SSE 处理器 |
SseClient.onData(Consumer<SseEvent>) | callback: 数据事件回调 | SseClient | 处理默认数据事件 |
SseClient.onEvent(String, Consumer<SseEvent>) | event: 事件类型 callback: 事件回调 | SseClient | 处理特定类型事件 |
SseClient.onOpen(Consumer<SseClient>) | callback: 连接回调 | SseClient | 连接成功回调 |
HttpRest.onFailure(Consumer<Throwable>) | callback: 失败回调 | HttpRest | 请求失败回调 |
HttpRest.onClose(Runnable) | callback: 关闭回调 | HttpRest | 连接关闭回调 |
HttpRest.submit() | - | void | 提交请求 |
HttpRest.close() | - | void | 关闭连接 |
SseEvent 类
Section titled “SseEvent 类”SseEvent 封装了从服务器接收到的事件数据:
| 方法 | 返回值 | 说明 |
|---|---|---|
getId() | String | 获取事件 ID |
getEvent() | String | 获取事件类型 |
getData() | String | 获取事件数据 |
getRetry() | Integer | 获取重试时间 |
处理特定事件类型
Section titled “处理特定事件类型”SSE 支持不同的事件类型,可以通过 onEvent 方法处理特定类型的事件:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;
public class SSEEventTypesDemo { public static void main(String[] args) { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/events") .onSSE(sseClient -> sseClient // 处理默认消息事件 .onData(event -> { System.out.println("默认消息: " + event.getData()); }) // 处理特定类型事件 .onEvent("notification", event -> { System.out.println("通知事件: " + event.getData()); }) .onEvent("update", event -> { System.out.println("更新事件: " + event.getData()); }) .onOpen(client -> { System.out.println("SSE 连接已建立"); })) .submit(); }}如果需要自定义连接参数,可以通过 HttpClient 进行配置:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;
public class SSEAdvancedConfigDemo { public static void main(String[] args) { HttpClient httpClient = Feat.httpClient("http://localhost:8080", opt -> { opt.connectTimeout(5000) // 设置连接超时时间为 5 秒 .readBufferSize(8192) // 设置读缓冲区大小为 8 KB .debug(true); // 开启调试模式 });
httpClient.get("/events") // 添加自定义请求头 .header(header -> { header.add("Authorization", "Bearer your-token"); header.add("Last-Event-ID", "event-123"); }) .onSSE(sseClient -> sseClient .onData(event -> { System.out.println("收到数据: " + event.getData()); System.out.println("事件ID: " + event.getId()); System.out.println("事件类型: " + event.getEvent()); }) .onOpen(client -> { System.out.println("SSE 连接已建立"); })) .submit(); }}手动关闭连接
Section titled “手动关闭连接”当不再需要 HTTP 连接时,可以手动关闭它:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.HttpRest;import tech.smartboot.feat.core.client.SseEvent;
public class SSECloseDemo { public static void main(String[] args) throws InterruptedException { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
// 保存 HttpRest 引用以便后续关闭连接 HttpRest httpRest = httpClient.get("/events") .onSSE(sseClient -> sseClient .onData(event -> { System.out.println("收到数据: " + event.getData()); }) .onOpen(client -> { System.out.println("SSE 连接已建立"); }));
// 提交请求 httpRest.submit();
// 5秒后手动关闭连接 Thread.sleep(5000); httpRest.close(); System.out.println("连接已关闭"); }}场景 1:实时通知
Section titled “场景 1:实时通知”适用条件:需要向客户端推送实时通知,如系统通知、消息提醒等
示例:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;
public class NotificationDemo { public static void main(String[] args) { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/notifications") .header("Authorization", "Bearer user-token") .onSSE(sseClient -> sseClient .onEvent("notification", event -> { System.out.println("新通知: " + event.getData()); // 显示通知到 UI }) .onEvent("alert", event -> { System.out.println("警报: " + event.getData()); // 显示警报到 UI }) .onOpen(client -> { System.out.println("通知连接已建立"); })) .onFailure(e -> { System.err.println("通知连接失败: " + e.getMessage()); }) .submit(); }}场景 2:实时数据更新
Section titled “场景 2:实时数据更新”适用条件:需要向客户端推送实时数据更新,如股票行情、传感器数据等
示例:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;
public class RealTimeDataDemo { public static void main(String[] args) { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/stock-prices") .onSSE(sseClient -> sseClient .onData(event -> { String data = event.getData(); System.out.println("股票数据: " + data); // 解析数据并更新 UI }) .onOpen(client -> { System.out.println("数据连接已建立"); })) .onFailure(e -> { System.err.println("数据连接失败: " + e.getMessage()); }) .submit(); }}场景 3:服务器状态监控
Section titled “场景 3:服务器状态监控”适用条件:需要监控服务器的实时状态,如 CPU 使用率、内存使用情况等
示例:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;
public class ServerMonitoringDemo { public static void main(String[] args) { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/server-status") .onSSE(sseClient -> sseClient .onEvent("cpu", event -> { System.out.println("CPU 使用率: " + event.getData() + "%"); }) .onEvent("memory", event -> { System.out.println("内存使用: " + event.getData() + "MB"); }) .onEvent("disk", event -> { System.out.println("磁盘使用: " + event.getData() + "%"); }) .onOpen(client -> { System.out.println("监控连接已建立"); })) .onFailure(e -> { System.err.println("监控连接失败: " + e.getMessage()); }) .submit(); }}在实际项目中使用 SSEClient 时,有一些最佳实践可以帮助你写出更好的代码。
1. 错误处理和重连
Section titled “1. 错误处理和重连”在生产环境中,网络问题可能导致连接断开,建议实现重连机制:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
public class SSEReconnectDemo { private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private static int reconnectAttempts = 0; private static final int MAX_RECONNECT_ATTEMPTS = 5;
public static void main(String[] args) { connectSSE(); }
private static void connectSSE() { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/events") .onSSE(sseClient -> sseClient .onData(event -> { System.out.println("收到数据: " + event.getData()); // 重置重连计数 reconnectAttempts = 0; }) .onOpen(client -> { System.out.println("SSE 连接已建立"); })) .onFailure(throwable -> { System.err.println("发生错误: " + throwable.getMessage()); // 安排重连 scheduleReconnect(); }) .onClose(() -> { System.out.println("连接已关闭"); // 安排重连 scheduleReconnect(); }) .submit(); }
private static void scheduleReconnect() { if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { int delay = 5 + (reconnectAttempts * 2); // 指数退避 System.out.println("将在 " + delay + " 秒后尝试重连..."); scheduler.schedule(() -> { reconnectAttempts++; connectSSE(); }, delay, TimeUnit.SECONDS); } else { System.err.println("重连失败,已达到最大尝试次数"); } }}2. 资源管理
Section titled “2. 资源管理”确保在应用程序关闭时正确释放资源:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.HttpRest;import tech.smartboot.feat.core.client.SseEvent;
public class SSEResourceManagementDemo { private static HttpRest httpRest;
public static void main(String[] args) { connectSSE();
// 添加关闭钩子以确保资源被释放 Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (httpRest != null) { httpRest.close(); System.out.println("HTTP 连接已关闭"); } })); }
private static void connectSSE() { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpRest = httpClient.get("/events") .onSSE(sseClient -> sseClient .onData(event -> { System.out.println("收到数据: " + event.getData()); }) .onOpen(client -> { System.out.println("SSE 连接已建立"); })) .onFailure(throwable -> { System.err.println("发生错误: " + throwable.getMessage()); });
httpRest.submit(); }}3. 数据解析和处理
Section titled “3. 数据解析和处理”对于复杂的 SSE 数据,建议使用适当的解析方式:
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.SseEvent;import com.fasterxml.jackson.databind.ObjectMapper;
public class SSEDataParsingDemo { private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) { HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/api/events") .onSSE(sseClient -> sseClient .onData(event -> { try { // 解析 JSON 数据 EventData eventData = objectMapper.readValue(event.getData(), EventData.class); System.out.println("事件类型: " + eventData.getType()); System.out.println("事件数据: " + eventData.getData()); System.out.println("事件时间: " + eventData.getTimestamp()); } catch (Exception e) { System.err.println("数据解析失败: " + e.getMessage()); } }) .onOpen(client -> { System.out.println("SSE 连接已建立"); })) .submit(); }
static class EventData { private String type; private String data; private long timestamp;
// getters and setters public String getType() { return type; } public void setType(String type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } }}| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 连接失败 | 服务器未启动或地址错误 | 检查服务器是否运行,确认 URL 正确 |
| 连接断开 | 网络不稳定或服务器重启 | 实现重连机制,使用指数退避策略 |
| 数据解析失败 | 数据格式错误 | 增加错误处理,检查服务器返回的数据格式 |
| 没有收到事件 | 服务器未发送事件或路径错误 | 检查服务器端代码,确认请求路径正确 |
| 连接超时 | 网络延迟或服务器响应慢 | 增加连接超时时间,检查网络状况 |
-
开启调试模式:
HttpClient httpClient = Feat.httpClient("http://localhost:8080", opt -> {opt.debug(true);}); -
检查服务器端 SSE 实现:
- 确保响应头包含
Content-Type: text/event-stream - 确保响应头包含
Cache-Control: no-cache - 确保数据格式符合 SSE 规范(以
data:开头,以两个换行符结束)
- 确保响应头包含
-
使用 curl 测试 SSE 端点:
Terminal window curl -N http://localhost:8080/events -
检查网络连接:
- 使用
ping命令检查服务器是否可达 - 使用
telnet命令检查端口是否开放
- 使用
性能与扩展性
Section titled “性能与扩展性”Feat SSEClient 基于 Feat 框架的高性能网络通信能力,能够支持大规模的并发连接和高频率的消息传输。
Feat SSEClient 的设计充分考虑了线程安全问题,保证在高并发场景下的稳定性。
- 缓冲区大小:根据实际数据大小调整读缓冲区大小
- 数据处理:及时处理接收到的数据,避免内存堆积
- 连接管理:不需要的连接及时关闭,释放资源
import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpClient;import tech.smartboot.feat.core.client.HttpRest;import tech.smartboot.feat.core.client.SseEvent;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
public class SSECompleteDemo { private static HttpRest httpRest; private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private static int reconnectAttempts = 0; private static final int MAX_RECONNECT_ATTEMPTS = 5;
public static void main(String[] args) { connectSSE();
// 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (httpRest != null) { httpRest.close(); System.out.println("连接已关闭"); } scheduler.shutdown(); })); }
private static void connectSSE() { HttpClient httpClient = Feat.httpClient("http://localhost:8080", opt -> { opt.connectTimeout(10000) .readBufferSize(8192) .debug(true); });
httpRest = httpClient.get("/events") .header("Authorization", "Bearer your-token") .header("Last-Event-ID", "latest") .onSSE(sseClient -> sseClient .onData(event -> { System.out.println("默认事件: " + event.getData()); reconnectAttempts = 0; }) .onEvent("message", event -> { System.out.println("消息事件: " + event.getData()); }) .onEvent("update", event -> { System.out.println("更新事件: " + event.getData()); }) .onOpen(client -> { System.out.println("SSE 连接已建立"); })) .onFailure(e -> { System.err.println("连接失败: " + e.getMessage()); scheduleReconnect(); }) .onClose(() -> { System.out.println("连接已关闭"); scheduleReconnect(); });
httpRest.submit(); }
private static void scheduleReconnect() { if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { int delay = 5 + (reconnectAttempts * 2); System.out.println("将在 " + delay + " 秒后尝试重连..."); scheduler.schedule(() -> { reconnectAttempts++; connectSSE(); }, delay, TimeUnit.SECONDS); } else { System.err.println("重连失败,已达到最大尝试次数"); } }}- HttpClient 详解:发送 HTTP 请求
- WebSocket Client:实现双向实时通信
- Feat 服务器:构建 HTTP 服务器
- Feat Cloud:构建 Web 应用