跳转到内容

SSE 客户端教程

SSE 适合“服务器持续推送,客户端被动接收”的场景,比如日志流、通知流、构建状态、监控指标。它比 WebSocket 简单,因为底层仍然是标准 HTTP。

  • 启动一个最小 SSE 服务端用于本地验证
  • 使用 Feat 客户端连接 text/event-stream 接口
  • 处理默认事件和命名事件
  • 学会在需要时手动关闭连接
  • JDK 1.8 或更高版本
  • Maven 3.0 或更高版本
  • 已引入 feat-core

如果你手头还没有 SSE 接口,先用下面这个最小服务自测:

SimpleSseServer.java
import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.server.upgrade.sse.SSEUpgrade;
import tech.smartboot.feat.core.server.upgrade.sse.SseEmitter;
import java.io.IOException;
public class SimpleSseServer {
public static void main(String[] args) {
Feat.httpServer().httpHandler(req -> {
if (!"/events".equals(req.getRequestURI())) {
req.getResponse().write("use /events");
return;
}
req.upgrade(new SSEUpgrade() {
@Override
public void onOpen(SseEmitter emitter) throws IOException {
emitter.send(SseEmitter.event()
.name("message")
.id("event-1")
.data("hello sse"));
emitter.send(SseEmitter.event()
.name("notification")
.id("event-2")
.data("build finished"));
emitter.complete();
}
});
}).listen(8081);
}
}

这个服务会在 http://localhost:8081/events 连续推送两条事件,然后主动关闭连接。

客户端最小写法如下:

SseClientDemo.java
import tech.smartboot.feat.Feat;
public class SseClientDemo {
public static void main(String[] args) {
Feat.httpClient("http://localhost:8081", opt -> opt.debug(true))
.get("/events")
.onSSE(sse -> sse
.onOpen(response -> {
System.out.println("connected: " + response.statusCode());
System.out.println("content-type: " + response.getContentType());
})
.onData(event -> {
System.out.println("id: " + event.getId());
System.out.println("type: " + event.getType());
System.out.println("data: " + event.getData());
}))
.onFailure(Throwable::printStackTrace)
.submit();
}
}

这里有三个关键点:

  • onSSE(...) 会把当前 HTTP 请求切换成 SSE 处理模式
  • onOpen(...) 拿到的是 HttpResponse,不是 SseClient
  • onData(...) 用于处理默认事件流

如果一切正常,你会看到类似输出:

connected: 200
content-type: text/event-stream
id: event-1
type: message
data: hello sse

当服务端显式发送 event: notification 这类命名事件时,可以注册专门处理器:

NamedEventDemo.java
import tech.smartboot.feat.Feat;
public class NamedEventDemo {
public static void main(String[] args) {
Feat.httpClient("http://localhost:8081")
.get("/events")
.onSSE(sse -> sse
.onData(event -> {
System.out.println("default event: " + event.getData());
})
.onEvent("notification", event -> {
System.out.println("notification: " + event.getData());
}))
.onFailure(Throwable::printStackTrace)
.submit();
}
}

行为上可以这样理解:

  • onEvent("notification", ...) 专门处理 event: notification
  • onData(...) 处理默认事件,也能在没有专用处理器时兜底

如果你的 SSE 接口需要鉴权,或者需要带 Last-Event-ID 续传头,可以继续使用普通 HTTP 请求头配置:

SseHeaderDemo.java
import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.common.HeaderName;
public class SseHeaderDemo {
public static void main(String[] args) {
Feat.httpClient("http://localhost:8081")
.get("/events")
.header(header -> header
.set(HeaderName.AUTHORIZATION, "Bearer token")
.add("Last-Event-ID", "event-1"))
.onSSE(sse -> sse.onData(event -> {
System.out.println(event.getData());
}))
.onFailure(Throwable::printStackTrace)
.submit();
}
}

SSE 往往是长连接。如果你在命令行工具、桌面客户端或后台任务里使用它,通常需要保留 HttpRest 引用以便手动关闭。

SseCloseDemo.java
import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpRest;
public class SseCloseDemo {
public static void main(String[] args) throws Exception {
HttpRest rest = Feat.httpClient("http://localhost:8081")
.get("/events")
.onSSE(sse -> sse.onData(event -> {
System.out.println(event.getData());
}))
.onFailure(Throwable::printStackTrace);
rest.submit();
Thread.sleep(5000);
rest.close();
}
}

onSSE(...)onResponseHeader(...) 都会接管响应头处理逻辑。
同一个请求上不要同时把它们都当成主入口来写,否则后注册的处理器会覆盖前面的行为。

明明接口返回 200,但 onData(...) 不触发

Section titled “明明接口返回 200,但 onData(...) 不触发”

优先检查响应头里的 Content-Type 是否以 text/event-stream 开头。
默认 onSSE(...) 只有在状态码为 200 且内容类型符合 SSE 规范时才会升级。

服务端发的是命名事件,但我只看到了默认处理逻辑

Section titled “服务端发的是命名事件,但我只看到了默认处理逻辑”

检查你是否注册了对应的 onEvent("eventName", ...)。如果没有,事件会落回默认处理逻辑。

连接一直不结束,程序也不退出

Section titled “连接一直不结束,程序也不退出”

这是 SSE 的正常表现。它本来就是长连接。
如果你需要在 CLI 或任务程序里退出,记得保留 HttpRest 引用并调用 close()

如何快速验证客户端本身有没有问题

Section titled “如何快速验证客户端本身有没有问题”

先用本页的本地 SimpleSseServer。本地服务可控,问题排查最短,不要一开始就依赖外部网络环境。