Skip to content

SSE 客户端

This content is not available in your language yet.

SSE 客户端基于 HttpClient 构建,用于接收服务端推送的流式事件,适合实时通知、日志流、监控指标等场景。

使用 Feat 工厂方法创建 SSE 连接:

import tech.smartboot.feat.Feat;
Feat.httpClient("http://localhost:8080")
.get("/events")
.onSSE(sse -> sse
.onOpen(response -> {
System.out.println("连接成功,状态码: " + response.statusCode());
})
.onData(event -> {
System.out.println("收到事件: " + event.getData());
}))
.onFailure(error -> {
System.err.println("连接失败: " + error.getMessage());
})
.submit();
Feat.httpClient("http://localhost:8080", options -> {
options.debug(true)
.connectTimeout(5000)
.idleTimeout(30000);
})
.get("/events")
.onSSE(sse -> sse.onData(event -> System.out.println(event.getData())))
.submit();

SSE 事件对象包含三个核心字段:

public interface SseEvent {
String getId(); // 事件 ID(用于断点续传)
String getType(); // 事件类型(event 字段)
String getData(); // 事件数据(data 字段)
}

SSE 支持通过 event: 字段定义事件类型,客户端可以为不同类型注册不同处理器:

Feat.httpClient("http://localhost:8080")
.get("/events")
.onSSE(sse -> sse
.onOpen(response -> System.out.println("已连接"))
// 默认事件处理器
.onData(event -> {
System.out.println("默认事件: " + event.getData());
})
// 命名事件处理器
.onEvent("message", event -> {
System.out.println("消息事件: " + event.getData());
})
.onEvent("notification", event -> {
System.out.println("通知事件: " + event.getData());
}))
.submit();

以下泳道图展示了 SSE 连接过程中各回调的触发时机:

sequenceDiagram
    autonumber
    participant App as 应用程序
    participant Client as SSE Client
    participant Server as SSE 服务器

    App->>Client: submit() 发起连接
    Client->>Server: HTTP Request (Accept: text/event-stream)
    
    alt 连接成功
        Server-->>Client: HTTP Response Headers
        Client-->>App: onOpen()
        
        loop 持续推送
            Server-->>Client: SSE Event (data: ...)
            Client-->>App: onData() / onEvent()
        end
        
        Server-->>Client: 连接关闭
        Client-->>App: onClose()
    else 连接失败
        Client-->>App: onFailure()
        Client-->>App: onClose()
    end

回调触发顺序说明:

  1. 发起连接 - 调用 submit() 后,客户端发送 HTTP 请求
  2. 连接成功 - 收到响应头时触发 onOpen(),此时可以获取状态码和响应头
  3. 接收事件 - 服务端推送事件时触发 onData() 或对应的 onEvent()
  4. 连接失败 - 网络异常或超时等情况下触发 onFailure()
  5. 连接关闭 - 无论成功或失败,连接关闭时都会触发 onClose()
import tech.smartboot.feat.core.common.HeaderName;
Feat.httpClient("http://localhost:8080")
.get("/events")
.header(header -> header
.set(HeaderName.AUTHORIZATION, "Bearer token123")
.add("Last-Event-ID", "event-100")) // 断点续传
.onSSE(sse -> sse.onData(event -> {
System.out.println(event.getData());
}))
.submit();
请求头说明
Accept: text/event-stream必需,声明接受 SSE 格式
Last-Event-ID可选,断点续传时指定上次接收的事件 ID
Authorization可选,认证令牌
Cache-Control: no-cache可选,禁用缓存

SSE 是长连接,程序不会自动退出,需要显式关闭。

import tech.smartboot.feat.core.client.HttpRest;
HttpRest rest = Feat.httpClient("http://localhost:8080")
.get("/events")
.onSSE(sse -> sse.onData(event -> {
System.out.println("收到: " + event.getData());
// 收到特定消息后关闭
if ("complete".equals(event.getData())) {
rest.close();
}
}))
.onFailure(Throwable::printStackTrace);
rest.submit();
// 或者定时关闭
// Thread.sleep(30000);
// rest.close();
SimpleSseServer.java
import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.server.upgrade.sse.SSEUpgrade;
import tech.smartboot.feat.core.server.upgrade.sse.SseEmitter;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimpleSseServer {
public static void main(String[] args) {
Feat.httpServer().httpHandler(req -> {
if (!"/events".equals(req.getRequestURI())) {
req.getResponse().write("use /events");
return;
}
req.upgrade(new SSEUpgrade() {
@Override
public void onOpen(SseEmitter emitter) throws IOException {
emitter.send(SseEmitter.event()
.name("connected")
.data("Welcome to SSE"));
// 定时发送消息
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
try {
emitter.send(SseEmitter.event()
.name("message")
.id(String.valueOf(System.currentTimeMillis()))
.data("Server time: " + System.currentTimeMillis()));
} catch (IOException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);
}
});
}).listen(8080);
}
}
SseClientDemo.java
import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpRest;
import tech.smartboot.feat.core.common.HeaderName;
public class SseClientDemo {
public static void main(String[] args) throws Exception {
HttpRest rest = Feat.httpClient("http://localhost:8080")
.get("/events")
.header(header -> header
.set(HeaderName.ACCEPT, "text/event-stream")
.set(HeaderName.CACHE_CONTROL, "no-cache"))
.onSSE(sse -> sse
.onOpen(response -> {
System.out.println("连接成功!");
})
.onEvent("connected", event -> {
System.out.println("[连接事件] " + event.getData());
})
.onEvent("message", event -> {
System.out.println("[消息事件] ID: " + event.getId() + ", Data: " + event.getData());
})
.onData(event -> {
System.out.println("[默认处理] Type: " + event.getType());
}))
.onFailure(error -> {
System.err.println("连接失败: " + error.getMessage());
});
rest.submit();
// 保持程序运行
Thread.sleep(60000);
rest.close();
}
}