SSE 客户端
This content is not available in your language yet.
SSE 客户端基于 HttpClient 构建,用于接收服务端推送的流式事件,适合实时通知、日志流、监控指标等场景。
使用 Feat 工厂方法创建 SSE 连接:
import tech.smartboot.feat.Feat;
Feat.httpClient("http://localhost:8080") .get("/events") .onSSE(sse -> sse .onOpen(response -> { System.out.println("连接成功,状态码: " + response.statusCode()); }) .onData(event -> { System.out.println("收到事件: " + event.getData()); })) .onFailure(error -> { System.err.println("连接失败: " + error.getMessage()); }) .submit();带配置的连接
Section titled “带配置的连接”Feat.httpClient("http://localhost:8080", options -> { options.debug(true) .connectTimeout(5000) .idleTimeout(30000);}) .get("/events") .onSSE(sse -> sse.onData(event -> System.out.println(event.getData()))) .submit();基本事件监听
Section titled “基本事件监听”SSE 事件对象包含三个核心字段:
public interface SseEvent { String getId(); // 事件 ID(用于断点续传) String getType(); // 事件类型(event 字段) String getData(); // 事件数据(data 字段)}处理命名事件
Section titled “处理命名事件”SSE 支持通过 event: 字段定义事件类型,客户端可以为不同类型注册不同处理器:
Feat.httpClient("http://localhost:8080") .get("/events") .onSSE(sse -> sse .onOpen(response -> System.out.println("已连接")) // 默认事件处理器 .onData(event -> { System.out.println("默认事件: " + event.getData()); }) // 命名事件处理器 .onEvent("message", event -> { System.out.println("消息事件: " + event.getData()); }) .onEvent("notification", event -> { System.out.println("通知事件: " + event.getData()); })) .submit();回调触发顺序
Section titled “回调触发顺序”以下泳道图展示了 SSE 连接过程中各回调的触发时机:
sequenceDiagram
autonumber
participant App as 应用程序
participant Client as SSE Client
participant Server as SSE 服务器
App->>Client: submit() 发起连接
Client->>Server: HTTP Request (Accept: text/event-stream)
alt 连接成功
Server-->>Client: HTTP Response Headers
Client-->>App: onOpen()
loop 持续推送
Server-->>Client: SSE Event (data: ...)
Client-->>App: onData() / onEvent()
end
Server-->>Client: 连接关闭
Client-->>App: onClose()
else 连接失败
Client-->>App: onFailure()
Client-->>App: onClose()
end
回调触发顺序说明:
- 发起连接 - 调用
submit()后,客户端发送 HTTP 请求 - 连接成功 - 收到响应头时触发
onOpen(),此时可以获取状态码和响应头 - 接收事件 - 服务端推送事件时触发
onData()或对应的onEvent() - 连接失败 - 网络异常或超时等情况下触发
onFailure() - 连接关闭 - 无论成功或失败,连接关闭时都会触发
onClose()
import tech.smartboot.feat.core.common.HeaderName;
Feat.httpClient("http://localhost:8080") .get("/events") .header(header -> header .set(HeaderName.AUTHORIZATION, "Bearer token123") .add("Last-Event-ID", "event-100")) // 断点续传 .onSSE(sse -> sse.onData(event -> { System.out.println(event.getData()); })) .submit();常用 SSE 请求头
Section titled “常用 SSE 请求头”| 请求头 | 说明 |
|---|---|
Accept: text/event-stream | 必需,声明接受 SSE 格式 |
Last-Event-ID | 可选,断点续传时指定上次接收的事件 ID |
Authorization | 可选,认证令牌 |
Cache-Control: no-cache | 可选,禁用缓存 |
手动关闭连接
Section titled “手动关闭连接”SSE 是长连接,程序不会自动退出,需要显式关闭。
import tech.smartboot.feat.core.client.HttpRest;
HttpRest rest = Feat.httpClient("http://localhost:8080") .get("/events") .onSSE(sse -> sse.onData(event -> { System.out.println("收到: " + event.getData()); // 收到特定消息后关闭 if ("complete".equals(event.getData())) { rest.close(); } })) .onFailure(Throwable::printStackTrace);
rest.submit();
// 或者定时关闭// Thread.sleep(30000);// rest.close();本地测试服务端
Section titled “本地测试服务端”import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.server.upgrade.sse.SSEUpgrade;import tech.smartboot.feat.core.server.upgrade.sse.SseEmitter;
import java.io.IOException;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;
public class SimpleSseServer { public static void main(String[] args) { Feat.httpServer().httpHandler(req -> { if (!"/events".equals(req.getRequestURI())) { req.getResponse().write("use /events"); return; }
req.upgrade(new SSEUpgrade() { @Override public void onOpen(SseEmitter emitter) throws IOException { emitter.send(SseEmitter.event() .name("connected") .data("Welcome to SSE"));
// 定时发送消息 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { try { emitter.send(SseEmitter.event() .name("message") .id(String.valueOf(System.currentTimeMillis())) .data("Server time: " + System.currentTimeMillis())); } catch (IOException e) { e.printStackTrace(); } }, 1, 1, TimeUnit.SECONDS); } }); }).listen(8080); }}客户端完整代码
Section titled “客户端完整代码”import tech.smartboot.feat.Feat;import tech.smartboot.feat.core.client.HttpRest;import tech.smartboot.feat.core.common.HeaderName;
public class SseClientDemo { public static void main(String[] args) throws Exception { HttpRest rest = Feat.httpClient("http://localhost:8080") .get("/events") .header(header -> header .set(HeaderName.ACCEPT, "text/event-stream") .set(HeaderName.CACHE_CONTROL, "no-cache")) .onSSE(sse -> sse .onOpen(response -> { System.out.println("连接成功!"); }) .onEvent("connected", event -> { System.out.println("[连接事件] " + event.getData()); }) .onEvent("message", event -> { System.out.println("[消息事件] ID: " + event.getId() + ", Data: " + event.getData()); }) .onData(event -> { System.out.println("[默认处理] Type: " + event.getType()); })) .onFailure(error -> { System.err.println("连接失败: " + error.getMessage()); });
rest.submit();
// 保持程序运行 Thread.sleep(60000); rest.close(); }}