@CrossOrigin(origins = "*", maxAge = 3600)
@Controller
public class SSEController {
@GetMapping(value = "/stream1")
public void stream1(HttpServletResponse response) throws Exception {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
/*
数据格式
event:me
data:Data chunk 0
event:me
data:Data chunk 1
event:me
data:Data chunk 2
*/
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
// 指定事件标识 event: 这个为固定格式
response.getWriter().write("event:me\n");
// 格式:data: + 数据 + 2个回车
String data = "Data chunk " + i;
response.getWriter().write("data:" + data + "\n\n");
response.getWriter().flush();
}
}
/**
* 使用 Server-Sent Events (SSE) 可以实现在服务器端逐步向客户端发送数据
*/
@GetMapping(value = "/stream2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream2() {
SseEmitter emitter = new SseEmitter();
// 异步处理,避免阻塞主线程
new Thread(() -> {
try {
/*
数据格式:
data:Data chunk 0
data:Data chunk 1
data:Data chunk 2
*/
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
String data = "Data chunk " + i;
emitter.send(SseEmitter.event().data(data));
}
emitter.complete();
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
@GetMapping(value = "/stream3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter stream3() {
SseEmitter emitter = new SseEmitter();
// 注册回调函数,处理服务器向客户端推送的消息
emitter.onCompletion(() -> {
System.out.println("Connection completed");
// 在连接完成时执行一些清理工作
});
emitter.onTimeout(() -> {
System.out.println("Connection timeout");
// 在连接超时时执行一些处理
emitter.complete();
});
// 异步处理,避免阻塞主线程
new Thread(() -> {
try {
/*
数据格式
event:message
data:Data chunk 0
event:message
data:Data chunk 1
event:message
data:Data chunk 2
*/
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
String data = "Data chunk " + i;
// 默认是 message 事件
// emitter.send() 方法向客户端发送消息
// SseEmitter.event() 创建一个事件对象,设置事件名称和数据
emitter.send(SseEmitter.event().name("message").data(data));
}
// emitter.complete() 表示数据发送完成后关闭连接
emitter.complete();
} catch (IOException | InterruptedException e) {
// emitter.completeWithError(e) 在发生错误时关闭连接并报错
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<div id="message"></div>
</div>
<script type="text/javascript">
// 初始化,参数为url
// 依赖H5
var sse = new EventSource("http://localhost:18080/push")
// 监听消息并打印
sse.onmessage = function (evt) {
console.log("message", evt.data, evt)
}
// 如果指定了事件标识需要用这种方式来进行监听事件流
sse.addEventListener("me", function (evt) {
console.log("me event", evt.data)
// 事件流如果不关闭会自动刷新请求,所以我们需要根据条件手动关闭
if (evt.data == 3) {
sse.close(); // 注释掉这个可以看到自动重连的效果
}
setMessageInnerHTML(evt.data);
})
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</body>
</html>