spring boot · 2024-10-31 0

SpringBoot SSE 使用

1.服务端

maven 依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.2</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

java 代码

@CrossOrigin(origins = "*", maxAge = 3600)
@Controller
public class SSEController {

    @GetMapping(value = "/stream1")
    public void stream1(HttpServletResponse response) throws Exception {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        /*
            数据格式
            event:me
            data:Data chunk 0

            event:me
            data:Data chunk 1

            event:me
            data:Data chunk 2
         */
        for (int i = 0; i < 5; i++) {
            TimeUnit.SECONDS.sleep(1);
            // 指定事件标识  event: 这个为固定格式
            response.getWriter().write("event:me\n");
            // 格式:data: + 数据 + 2个回车
            String data = "Data chunk " + i;
            response.getWriter().write("data:" + data + "\n\n");
            response.getWriter().flush();
        }
    }

    /**
     * 使用 Server-Sent Events (SSE) 可以实现在服务器端逐步向客户端发送数据
     */
    @GetMapping(value = "/stream2", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream2() {
        SseEmitter emitter = new SseEmitter();

        // 异步处理,避免阻塞主线程
        new Thread(() -> {
            try {
                /*
                    数据格式:
                    data:Data chunk 0

                    data:Data chunk 1

                    data:Data chunk 2
                 */
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
                    String data = "Data chunk " + i;
                    emitter.send(SseEmitter.event().data(data));
                }
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                emitter.completeWithError(e);
            }
        }).start();

        return emitter;
    }

    @GetMapping(value = "/stream3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter stream3() {
        SseEmitter emitter = new SseEmitter();

        // 注册回调函数,处理服务器向客户端推送的消息
        emitter.onCompletion(() -> {
            System.out.println("Connection completed");
            // 在连接完成时执行一些清理工作
        });

        emitter.onTimeout(() -> {
            System.out.println("Connection timeout");
            // 在连接超时时执行一些处理
            emitter.complete();
        });

        // 异步处理,避免阻塞主线程
        new Thread(() -> {
            try {
                /*
                    数据格式
                    event:message
                    data:Data chunk 0

                    event:message
                    data:Data chunk 1

                    event:message
                    data:Data chunk 2
                 */
                for (int i = 0; i < 10; i++) {
                    TimeUnit.SECONDS.sleep(1);
                    String data = "Data chunk " + i;
                    // 默认是 message 事件
                    // emitter.send() 方法向客户端发送消息
                    // SseEmitter.event() 创建一个事件对象,设置事件名称和数据
                    emitter.send(SseEmitter.event().name("message").data(data));
                }
                // emitter.complete() 表示数据发送完成后关闭连接
                emitter.complete();
            } catch (IOException | InterruptedException e) {
                // emitter.completeWithError(e) 在发生错误时关闭连接并报错
                emitter.completeWithError(e);
            }
        }).start();

        return emitter;
    }
}

2.客户端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<div>
    <div id="message"></div>
</div>
<script type="text/javascript">
    // 初始化,参数为url
    // 依赖H5
    var sse = new EventSource("http://localhost:18080/push")

    // 监听消息并打印
    sse.onmessage = function (evt) {
        console.log("message", evt.data, evt)
    }

    // 如果指定了事件标识需要用这种方式来进行监听事件流
    sse.addEventListener("me", function (evt) {
        console.log("me event", evt.data)
        // 事件流如果不关闭会自动刷新请求,所以我们需要根据条件手动关闭
        if (evt.data == 3) {
            sse.close();    // 注释掉这个可以看到自动重连的效果
        }

        setMessageInnerHTML(evt.data);
    })

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</body>
</html>

3.客户端

maven 依赖

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.10.0</version>
</dependency>

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp-sse</artifactId>
    <version>4.10.0</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>test</scope>
</dependency>

java 代码

@Test
public void testSseClient() throws InterruptedException {
    Request request = new Request.Builder()
            .url("http://localhost:8080/stream1")
            .get()
            // .post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
            .build();

    OkHttpClient okHttpClient = new OkHttpClient.Builder()
            .connectTimeout(10, TimeUnit.SECONDS)
            .writeTimeout(50, TimeUnit.SECONDS)
            .readTimeout(50, TimeUnit.SECONDS)
//                .addInterceptor(new Interceptor() {
//                    @Override
//                    public Response intercept(Chain chain) throws IOException {
//                        Request original = chain.request();
//                        Request request = original.newBuilder()
//                                .header(Header.AUTHORIZATION.getValue(), "Bearer 123456")
//                                .method(original.method(), original.body())
//                                .build();
//                        return chain.proceed(request);
//                    }
//                })
            .build();

    EventSource.Factory factory = EventSources.createFactory(okHttpClient);
    EventSource eventSource = factory.newEventSource(request, new EventSourceListener() {
        @Override
        public void onOpen(EventSource eventSource, Response response) {
            int code = response.code();
            System.out.println("sse onOpen ...");
        }

        @Override
        public void onEvent(EventSource eventSource, String id, String type, String data) {
            System.out.println(String.format("id: %s type: %s data: %s", id, type, data));
        }

        @Override
        public void onClosed(EventSource eventSource) {
            System.out.println("sse onClosed ...");
        }

        @Override
        public void onFailure(EventSource eventSource, Throwable t, Response response) {
            System.out.println("sse onFailure ...");
        }
    });

    TimeUnit.SECONDS.sleep(100);
}