Skip to content

SSE 客户端详解

This content is not available in your language yet.

Server-Sent Events (SSE) 是一种服务器向客户端推送实时更新的技术。与 WebSocket 不同,SSE 是单向通信,只能服务器向客户端推送数据,但使用更简单,因为它是基于 HTTP 协议的。

以下是一个简单的示例,展示了如何使用 Feat SSEClient 连接 SSE 服务器并处理消息:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
public class SSEClientDemo {
public static void main(String[] args) {
// 创建 HttpClient 实例
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
// 注册 SSE 处理器并提交请求
httpClient.get("/events")
.onSSE(sseClient -> sseClient
// 注册默认事件处理器
.onData(event -> {
System.out.println("收到数据: " + event.getData());
})
// 注册连接成功回调
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
// 提交请求
.submit();
}
}

方法参数返回值说明
HttpClient.get(String path)path: 请求路径HttpRest创建 GET 请求
HttpRest.onSSE(Consumer<SseClient>)callback: SSE 客户端回调HttpRest注册 SSE 处理器
SseClient.onData(Consumer<SseEvent>)callback: 数据事件回调SseClient处理默认数据事件
SseClient.onEvent(String, Consumer<SseEvent>)event: 事件类型
callback: 事件回调
SseClient处理特定类型事件
SseClient.onOpen(Consumer<SseClient>)callback: 连接回调SseClient连接成功回调
HttpRest.onFailure(Consumer<Throwable>)callback: 失败回调HttpRest请求失败回调
HttpRest.onClose(Runnable)callback: 关闭回调HttpRest连接关闭回调
HttpRest.submit()-void提交请求
HttpRest.close()-void关闭连接

SseEvent 封装了从服务器接收到的事件数据:

方法返回值说明
getId()String获取事件 ID
getEvent()String获取事件类型
getData()String获取事件数据
getRetry()Integer获取重试时间

SSE 支持不同的事件类型,可以通过 onEvent 方法处理特定类型的事件:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
public class SSEEventTypesDemo {
public static void main(String[] args) {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/events")
.onSSE(sseClient -> sseClient
// 处理默认消息事件
.onData(event -> {
System.out.println("默认消息: " + event.getData());
})
// 处理特定类型事件
.onEvent("notification", event -> {
System.out.println("通知事件: " + event.getData());
})
.onEvent("update", event -> {
System.out.println("更新事件: " + event.getData());
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
.submit();
}
}

如果需要自定义连接参数,可以通过 HttpClient 进行配置:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
public class SSEAdvancedConfigDemo {
public static void main(String[] args) {
HttpClient httpClient = Feat.httpClient("http://localhost:8080", opt -> {
opt.connectTimeout(5000) // 设置连接超时时间为 5 秒
.readBufferSize(8192) // 设置读缓冲区大小为 8 KB
.debug(true); // 开启调试模式
});
httpClient.get("/events")
// 添加自定义请求头
.header(header -> {
header.add("Authorization", "Bearer your-token");
header.add("Last-Event-ID", "event-123");
})
.onSSE(sseClient -> sseClient
.onData(event -> {
System.out.println("收到数据: " + event.getData());
System.out.println("事件ID: " + event.getId());
System.out.println("事件类型: " + event.getEvent());
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
.submit();
}
}

当不再需要 HTTP 连接时,可以手动关闭它:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.HttpRest;
import tech.smartboot.feat.core.client.SseEvent;
public class SSECloseDemo {
public static void main(String[] args) throws InterruptedException {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
// 保存 HttpRest 引用以便后续关闭连接
HttpRest httpRest = httpClient.get("/events")
.onSSE(sseClient -> sseClient
.onData(event -> {
System.out.println("收到数据: " + event.getData());
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}));
// 提交请求
httpRest.submit();
// 5秒后手动关闭连接
Thread.sleep(5000);
httpRest.close();
System.out.println("连接已关闭");
}
}

适用条件:需要向客户端推送实时通知,如系统通知、消息提醒等

示例

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
public class NotificationDemo {
public static void main(String[] args) {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/notifications")
.header("Authorization", "Bearer user-token")
.onSSE(sseClient -> sseClient
.onEvent("notification", event -> {
System.out.println("新通知: " + event.getData());
// 显示通知到 UI
})
.onEvent("alert", event -> {
System.out.println("警报: " + event.getData());
// 显示警报到 UI
})
.onOpen(client -> {
System.out.println("通知连接已建立");
}))
.onFailure(e -> {
System.err.println("通知连接失败: " + e.getMessage());
})
.submit();
}
}

适用条件:需要向客户端推送实时数据更新,如股票行情、传感器数据等

示例

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
public class RealTimeDataDemo {
public static void main(String[] args) {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/stock-prices")
.onSSE(sseClient -> sseClient
.onData(event -> {
String data = event.getData();
System.out.println("股票数据: " + data);
// 解析数据并更新 UI
})
.onOpen(client -> {
System.out.println("数据连接已建立");
}))
.onFailure(e -> {
System.err.println("数据连接失败: " + e.getMessage());
})
.submit();
}
}

适用条件:需要监控服务器的实时状态,如 CPU 使用率、内存使用情况等

示例

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
public class ServerMonitoringDemo {
public static void main(String[] args) {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/server-status")
.onSSE(sseClient -> sseClient
.onEvent("cpu", event -> {
System.out.println("CPU 使用率: " + event.getData() + "%");
})
.onEvent("memory", event -> {
System.out.println("内存使用: " + event.getData() + "MB");
})
.onEvent("disk", event -> {
System.out.println("磁盘使用: " + event.getData() + "%");
})
.onOpen(client -> {
System.out.println("监控连接已建立");
}))
.onFailure(e -> {
System.err.println("监控连接失败: " + e.getMessage());
})
.submit();
}
}

在实际项目中使用 SSEClient 时,有一些最佳实践可以帮助你写出更好的代码。

在生产环境中,网络问题可能导致连接断开,建议实现重连机制:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SSEReconnectDemo {
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static int reconnectAttempts = 0;
private static final int MAX_RECONNECT_ATTEMPTS = 5;
public static void main(String[] args) {
connectSSE();
}
private static void connectSSE() {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/events")
.onSSE(sseClient -> sseClient
.onData(event -> {
System.out.println("收到数据: " + event.getData());
// 重置重连计数
reconnectAttempts = 0;
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
.onFailure(throwable -> {
System.err.println("发生错误: " + throwable.getMessage());
// 安排重连
scheduleReconnect();
})
.onClose(() -> {
System.out.println("连接已关闭");
// 安排重连
scheduleReconnect();
})
.submit();
}
private static void scheduleReconnect() {
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
int delay = 5 + (reconnectAttempts * 2); // 指数退避
System.out.println("将在 " + delay + " 秒后尝试重连...");
scheduler.schedule(() -> {
reconnectAttempts++;
connectSSE();
}, delay, TimeUnit.SECONDS);
} else {
System.err.println("重连失败,已达到最大尝试次数");
}
}
}

确保在应用程序关闭时正确释放资源:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.HttpRest;
import tech.smartboot.feat.core.client.SseEvent;
public class SSEResourceManagementDemo {
private static HttpRest httpRest;
public static void main(String[] args) {
connectSSE();
// 添加关闭钩子以确保资源被释放
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (httpRest != null) {
httpRest.close();
System.out.println("HTTP 连接已关闭");
}
}));
}
private static void connectSSE() {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpRest = httpClient.get("/events")
.onSSE(sseClient -> sseClient
.onData(event -> {
System.out.println("收到数据: " + event.getData());
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
.onFailure(throwable -> {
System.err.println("发生错误: " + throwable.getMessage());
});
httpRest.submit();
}
}

对于复杂的 SSE 数据,建议使用适当的解析方式:

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.SseEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SSEDataParsingDemo {
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) {
HttpClient httpClient = Feat.httpClient("http://localhost:8080");
httpClient.get("/api/events")
.onSSE(sseClient -> sseClient
.onData(event -> {
try {
// 解析 JSON 数据
EventData eventData = objectMapper.readValue(event.getData(), EventData.class);
System.out.println("事件类型: " + eventData.getType());
System.out.println("事件数据: " + eventData.getData());
System.out.println("事件时间: " + eventData.getTimestamp());
} catch (Exception e) {
System.err.println("数据解析失败: " + e.getMessage());
}
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
.submit();
}
static class EventData {
private String type;
private String data;
private long timestamp;
// getters and setters
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getData() { return data; }
public void setData(String data) { this.data = data; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
}

问题可能原因解决方案
连接失败服务器未启动或地址错误检查服务器是否运行,确认 URL 正确
连接断开网络不稳定或服务器重启实现重连机制,使用指数退避策略
数据解析失败数据格式错误增加错误处理,检查服务器返回的数据格式
没有收到事件服务器未发送事件或路径错误检查服务器端代码,确认请求路径正确
连接超时网络延迟或服务器响应慢增加连接超时时间,检查网络状况
  1. 开启调试模式

    HttpClient httpClient = Feat.httpClient("http://localhost:8080", opt -> {
    opt.debug(true);
    });
  2. 检查服务器端 SSE 实现

    • 确保响应头包含 Content-Type: text/event-stream
    • 确保响应头包含 Cache-Control: no-cache
    • 确保数据格式符合 SSE 规范(以 data: 开头,以两个换行符结束)
  3. 使用 curl 测试 SSE 端点

    Terminal window
    curl -N http://localhost:8080/events
  4. 检查网络连接

    • 使用 ping 命令检查服务器是否可达
    • 使用 telnet 命令检查端口是否开放

Feat SSEClient 基于 Feat 框架的高性能网络通信能力,能够支持大规模的并发连接和高频率的消息传输。

Feat SSEClient 的设计充分考虑了线程安全问题,保证在高并发场景下的稳定性。

  • 缓冲区大小:根据实际数据大小调整读缓冲区大小
  • 数据处理:及时处理接收到的数据,避免内存堆积
  • 连接管理:不需要的连接及时关闭,释放资源

import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.client.HttpClient;
import tech.smartboot.feat.core.client.HttpRest;
import tech.smartboot.feat.core.client.SseEvent;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class SSECompleteDemo {
private static HttpRest httpRest;
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static int reconnectAttempts = 0;
private static final int MAX_RECONNECT_ATTEMPTS = 5;
public static void main(String[] args) {
connectSSE();
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (httpRest != null) {
httpRest.close();
System.out.println("连接已关闭");
}
scheduler.shutdown();
}));
}
private static void connectSSE() {
HttpClient httpClient = Feat.httpClient("http://localhost:8080", opt -> {
opt.connectTimeout(10000)
.readBufferSize(8192)
.debug(true);
});
httpRest = httpClient.get("/events")
.header("Authorization", "Bearer your-token")
.header("Last-Event-ID", "latest")
.onSSE(sseClient -> sseClient
.onData(event -> {
System.out.println("默认事件: " + event.getData());
reconnectAttempts = 0;
})
.onEvent("message", event -> {
System.out.println("消息事件: " + event.getData());
})
.onEvent("update", event -> {
System.out.println("更新事件: " + event.getData());
})
.onOpen(client -> {
System.out.println("SSE 连接已建立");
}))
.onFailure(e -> {
System.err.println("连接失败: " + e.getMessage());
scheduleReconnect();
})
.onClose(() -> {
System.out.println("连接已关闭");
scheduleReconnect();
});
httpRest.submit();
}
private static void scheduleReconnect() {
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
int delay = 5 + (reconnectAttempts * 2);
System.out.println("将在 " + delay + " 秒后尝试重连...");
scheduler.schedule(() -> {
reconnectAttempts++;
connectSSE();
}, delay, TimeUnit.SECONDS);
} else {
System.err.println("重连失败,已达到最大尝试次数");
}
}
}