1.服务端
maven 依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
java 代码
@CrossOrigin(origins = "*", maxAge = 3600)
@Controller
public class SSEController {
@GetMapping(value = "/stream1")
public void stream1(HttpServletResponse response) throws Exception {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
/*
数据格式
event:me
data:Data chunk 0
event:me
data:Data chunk 1
event:me
data:Data chunk 2
*/
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
// 指定事件标识 event: 这个为固定格式
response.getWriter().write("event:me\n");
// 格式:data: + 数据 + 2个回车
String data = "Data chunk " + i;
response.getWriter().write("data:" + data + "\n\n");
response.getWriter().flush();
}
}
/**
* 使用 Server-Sent Events (SSE) 可以实现在服务器端逐步向客户端发送数据
*/
@GetMapping(value = "/stream2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream2() {
SseEmitter emitter = new SseEmitter();
// 异步处理,避免阻塞主线程
new Thread(() -> {
try {
/*
数据格式:
data:Data chunk 0
data:Data chunk 1
data:Data chunk 2
*/
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
String data = "Data chunk " + i;
emitter.send(SseEmitter.event().data(data));
}
emitter.complete();
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
@GetMapping(value = "/stream3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream3() {
SseEmitter emitter = new SseEmitter();
// 注册回调函数,处理服务器向客户端推送的消息
emitter.onCompletion(() -> {
System.out.println("Connection completed");
// 在连接完成时执行一些清理工作
});
emitter.onTimeout(() -> {
System.out.println("Connection timeout");
// 在连接超时时执行一些处理
emitter.complete();
});
// 异步处理,避免阻塞主线程
new Thread(() -> {
try {
/*
数据格式
event:message
data:Data chunk 0
event:message
data:Data chunk 1
event:message
data:Data chunk 2
*/
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
String data = "Data chunk " + i;
// 默认是 message 事件
// emitter.send() 方法向客户端发送消息
// SseEmitter.event() 创建一个事件对象,设置事件名称和数据
emitter.send(SseEmitter.event().name("message").data(data));
}
// emitter.complete() 表示数据发送完成后关闭连接
emitter.complete();
} catch (IOException | InterruptedException e) {
// emitter.completeWithError(e) 在发生错误时关闭连接并报错
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}
2.客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<div id="message"></div>
</div>
<script type="text/javascript">
// 初始化,参数为url
// 依赖H5
var sse = new EventSource("http://localhost:18080/push")
// 监听消息并打印
sse.onmessage = function (evt) {
console.log("message", evt.data, evt)
}
// 如果指定了事件标识需要用这种方式来进行监听事件流
sse.addEventListener("me", function (evt) {
console.log("me event", evt.data)
// 事件流如果不关闭会自动刷新请求,所以我们需要根据条件手动关闭
if (evt.data == 3) {
sse.close(); // 注释掉这个可以看到自动重连的效果
}
setMessageInnerHTML(evt.data);
})
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</body>
</html>
3.客户端
maven 依赖
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
java 代码
@Test
public void testSseClient() throws InterruptedException {
Request request = new Request.Builder()
.url("http://localhost:8080/stream1")
.get()
// .post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.build();
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.writeTimeout(50, TimeUnit.SECONDS)
.readTimeout(50, TimeUnit.SECONDS)
// .addInterceptor(new Interceptor() {
// @Override
// public Response intercept(Chain chain) throws IOException {
// Request original = chain.request();
// Request request = original.newBuilder()
// .header(Header.AUTHORIZATION.getValue(), "Bearer 123456")
// .method(original.method(), original.body())
// .build();
// return chain.proceed(request);
// }
// })
.build();
EventSource.Factory factory = EventSources.createFactory(okHttpClient);
EventSource eventSource = factory.newEventSource(request, new EventSourceListener() {
@Override
public void onOpen(EventSource eventSource, Response response) {
int code = response.code();
System.out.println("sse onOpen ...");
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
System.out.println(String.format("id: %s type: %s data: %s", id, type, data));
}
@Override
public void onClosed(EventSource eventSource) {
System.out.println("sse onClosed ...");
}
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
System.out.println("sse onFailure ...");
}
});
TimeUnit.SECONDS.sleep(100);
}