🐠 编程式 SSE 接口
# SSE
SSE(Server-Sent Events),也被称作“事件流”(Event Stream),是一种旨在通过HTTP协议来实现服务器主动向客户端推送数据的技术手段。 它可以在客户端与服务器之间建立一条持久化的HTTP长连接,并通过这条连接实现服务器向客户端的实时数据推送,但客户端不能通过 SSE 向服务端发送数据。
自v1.6.0
版本起,Forest 提供了对 SSE 的支持,并且支持声名式
和编程式
两种 SSE 请求方式。
# 编程式 SSE 接口
# 转换为 SSE 请求
ForestSSE sse = Forest.get("/sse") // 普通的 GET 请求
.contentType("text/event-stream") // 设置 Content-Type
.sse(); // 转换为 SSE 控制器,并返回 ForestSSE 实例
2
3
除了 GET 请求,其他任何类型请求都可以转化成 SSE 控制器
sse = Forest.post("/sse").sse();
sse = Forest.put("/sse").sse();
2
# 监听事件
调用sse()
方法后会返回ForestSSE类对象,此时变化把普通的 HTTP 请求转换为 SSE 控制器,但不会发送任何实际的网络请求
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.listen(); // 开始监听 SSE 事件流,并阻塞当前线程
2
3
4
# 异步监听
ForestSSE 控制器中的listen()
方法是一种同步的监听方法,另外 Forset 也提供了异步的监听方法:asyncListen()
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.asyncListen(); // 开始异步监听 SSE 事件流,并且不会阻塞当前线程
2
3
4
ForestSSE 类还提供了await()
方法,用于阻塞等待当前线程,直到异步监听结束为止。
ForestSSE sse = Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.asyncListen(); // 开始异步监听 SSE 事件流,并且不会阻塞当前线程
sse.await(); // 阻塞并等待异步监听结束,可以在需要阻塞的地方调用
2
3
4
5
6
# 行模式
通常情况下,SSE 的消息流有一套标准的格式,一组消息分多行,每一行都是name:value
形式的键值对字符串,每组消息之间用空白行隔开,具体形式如下:
id:1
event:user
data:{"name":"张三"}
id:2
event:user
data:{"name":"李四"}
id:3
event:user
data:{"name":"王五"}
2
3
4
5
6
7
8
9
10
11
这种格式的消息,可以通过制定行模式为多行模式MULTI_LINES
来读取解析
Forest.get("/sse")
.sse()
.setOnMessage(event -> { // 处理多行模式的消息
event.id(); // 消息名为 id 的值,这里应得到 1, 2, ...
event.event(); // user
event.data(); // {"name": "张三"}, ...
})
.listen(SSELinesMode.MULTI_LINES);
2
3
4
5
6
7
8
SSE 的消息除了有标准格式,还有很多非标准的格式,比如每行都是一条JSON字符串,每一行都是单独的消息,它们之间没有空白行分割
{"name":"a"}
{"name":"b"}
{"name":"c"}
2
3
对于这种类型的消息,就要使用单行模式(SINGLE_LINE)
Forest.get("/sse")
.sse()
.setOnMessage(event -> { // 处理单行模式的消息
String str = event.value(); // 获取字符串类型的消息值
MyUser user = event.value(MyUser.class); // 获取消息值并转换为自定义类型
})
.listen(SSELinesMode.SINGLE_LINE);
2
3
4
5
6
7
listen
方法不传参数的情况下,默认为AUTO,AUTO模式会自动识别需要采用的行模式
// AUTO 模式会自动识别需要采用的行模式
sse.listen(); // 默认行模式为 AUTO
2
# 事件处理
在上面的例子中,我们可以通过listen()
和asyncListen()
方法进行事件监听,但若在此时接受到实际事件消息,并不会进行任何处理。
如要针对不同事件内容,进行响应的处理,就需要选择一种事件处理方式来处理接受到的事件。
在 Forest 中事件处理的方式也有三种:SSE 回调函数、自定义 SSE 控制器、以及 SSE 拦截器。
# SSE 回调函数
SSE 回调函数来处理事件的方式最为简便和直接,在调用sse()
方法返回 ForestSSE 对象后,即可调用setOnMessage
方法,来设置处 SSE 的事件消息
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.setOnOpen(event -> {
// SSE 开始监听时调用
// event 为 EventSource 事件源对象
event.request(); // 获取 Forest 请求对象
event.response(); // 获取 Forest 响应对象
})
.setOnClose(event -> {
// SSE 结束监听时调用
})
.setOnMessage(event -> {
event.id(); // 消息名为 id 的值
event.event(); // 消息名为 event 的值
event.data(); // 消息名为 data 的值
event.value("text"); // 获取非标准名称的消息值,如: text
})
.listen(); // 开始监听 SSE 事件流,默认AUTO模式
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
除了基本的消息监听处理功能之外,也可以调用setOnOpen
、setOnClose
、addOnData
、addOnEvent
等方法来设置处理 SSE 不同生命周期、不同类型事件的回调函数
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.setOnOpen(eventSource -> {
// SSE 开始监听时调用
// eventSource 为 EventSource 事件源对象
eventSource.request(); // 获取 Forest 请求对象
eventSource.response(); // 获取 Forest 响应对象
})
.setOnClose(eventSource -> {
// SSE 结束监听时调用
})
.setOnMessage(event -> { // 处理每一组 SSE 消息
event.id(); // 消息名为 id 的值
event.event(); // 消息名为 event 的值
event.data(); // 消息名为 data 的值
event.value("text"); // 获取非标准名称的消息值,如: text
})
.addOnData((eventSource, name, value) -> {
// 处理事件消息名称为 data 的事件
// name 为事件消息名称 (同 eventSource.name())
// value 为事件消息的值
eventSource.sse(); // 获取 SSE 控制器
eventSource.request(); // 获取 Forest 请求对象
eventSource.response(); // 获取 Forest 响应对象
eventSource.rawData(); // 获取事件消息的原始数据
eventSource.name(); // 获取事件消息的名称,如 data、event、id 等
eventSource.value(); // 获取事件消息的值,返回字符串类型数据
// 获取事件消息的值,并将消息转换为指定的自定义类型
eventSource.value(MyUser.class);
// 获取事件消息的值,并将消息转换为指定的自定义泛型类型
eventSource.value(new TypeReference<List<MyUser>>() {});
})
.addOnEvent((eventSource, name, value) -> {
// 处理事件消息名称为 event 的事件
// 参数内容与上面的 addOnData 回调函数参数相同
})
.addOnId((eventSource, name, value) -> {
// 处理事件消息名称为 id 的事件
// 参数内容与上面的 addOnData 或 addOnEvent 回调函数参数相同
})
.addOnRetry((eventSource, name, value) -> {
// 处理事件消息名称为 retry 的事件
// 参数内容与上面的几个回调函数参数相同
})
.listen(); // 开始同步监听 SSE 事件流,默认AUTO模式
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
addOnData
、addOnEvent
以及addOnId
等函数都是监听内置固定消息名称的方法,也可以调用addConsumer
方法来处理非固定的,通过字符串参数指定的消息名称的事件
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.addConsumer("data", (eventSource, name, value) -> {
// 通过字符串参数指定要监听的消息名称
// 第一个参数传入 "data",就会监听消息名称为 data 的事件
// 参数内容与上面的 addOnData、addOnId 等回调函数参数一致
})
.addConsumer("event", (eventSource, name, value) -> {
// 通过字符串参数,来指定要监听消息名为 event 的事件
})
.addConsumer("push", (eventSource, name, value) -> {
// 当然也可以指定非标准的消息名称
})
.listen(); // 开始监听 SSE 事件流
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 自定义 SSE 控制器
如果不想在调用 SSE 接口的地方写一大堆 SSE 事件的处理代码,想把事件监听和接受到事件后的事件处理逻辑代码进行解耦,独立到另一个地方的话,可以使用 自定义 SSE 控制器
// 自定义 SSE 控制器类,需要继承 ForestSSE
public class MySSEHandler extends ForestSSE {
@Override
protected void onOpen(EventSource eventSource) {
// SSE 开始监听时调用
System.out.println("onOpen");
}
@Override
protected void onClose(EventSource eventSource) {
// SSE 结束监听时调用
System.out.println("onClose");
}
// 1.6.4+ 版本可以重写 onMessage 方法来接收 SSE 消息
@Override
public void onMessage(EventSource event) {
event.id(); // 获取消息名称为 id 的消息值
event.event(); // 获取消息名称为 event 的消息值
event.data(); // 获取消息名称为 data 的消息值
event.retry(); // 获取消息名称为 retry 的消息值
event.value("text"); // 获取非标准消息名称为 text 的消息值
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
完成这一步后,在调用时就无需再写回调函数了,只需要在调用sse()
方法的时候,将自定义 SSE 控制器的类作为参数传入即可
Forest.get("/sse")
.contentType("text/event-stream")
.sse(MySSEHandler.class)
.listen();
2
3
4
在调用sse(Class<T extends ForestSSE>)
方法后,返回的也是自定义 SSE 控制器类的对象
MySSEHandler mySSEHandler = Forest.get("/sse")
.contentType("text/event-stream")
.sse(MySSEHandler.class);
2
3
关于具体如何实现自定义 SSE 控制器,请参见《自定义 SSE 控制器》
# SSE 拦截器
除了回调函数和自定义 SSE 控制器两种方式外,还可以使用 SSE 拦截器来实现 SSE 事件的处理
相比于 SSE 控制器,SSE 拦截器为单例对象,不会在每次发起 SSE 请求时创建一个实例,并且可以注入到 Spring 上下文中,可调用 Spring 上下文中的对象和资源
// 自定义 SSE 拦截器
// 要注入 Spring 上下文的,一定要加上 @Component 注解,不加上就不能注入到 Spring 上下文
@Component
public class MySSEInterceptor implements SSEInterceptor {
// 可以注入 Spring 上下文中的对象
@Resource
private MyUserService myUserService;
@Override
protected void onOpen(EventSource eventSource) {
// SSE 开始监听时调用
System.out.println("onOpen");
}
@Override
protected void onClose(EventSource eventSource) {
// SSE 结束监听时调用
System.out.println("onClose");
}
// 1.6.4+ 版本可以重写 onMessage 方法来接收 SSE 消息
@Override
public void onMessage(EventSource event) {
event.id(); // 获取消息名称为 id 的消息值
event.event(); // 获取消息名称为 event 的消息值
event.data(); // 获取消息名称为 data 的消息值
event.retry(); // 获取消息名称为 retry 的消息值
event.value("text"); // 获取非标准消息名称为 text 的消息值
}
@SSEDataMessage
public void onSSEData(EventSource eventSource, @SSEName String name, @SSEValue String value) {
// 处理事件消息名称为 data 的事件
System.out.println("onSSEData: " + name + ": " + value);
myUserService.send(value);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
完成这一步后,在调用时就无需再写回调函数了,只需要在调用sse()
方法之前,调用addInterceptor
方法将自定义的 SSE 拦截器类加入到请求中即可
Forest.get("/sse")
.contentType("text/event-stream")
.addInterceptor(SSEInterceptor.class)
.sse()
.listen();
2
3
4
5
关于 SSE 拦截器的详情,请参见《SSE 拦截器》
# 关闭 SSE 事件流
可以调用 SSE 控制器及其子类(也就是自定义 SSE 控制器类)的close()
方法
使用此方法需要先获得 SSE 控制器对象
// 通过异步监听获得 SSE 控制器对象
ForestSSE sse = Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.asyncListen();
2
3
4
5
或是先获得 SSE 控制器,再进行同步监听
// 先获得 SSE 控制器对象
ForestSSE sse = Forest.get("/sse")
.contentType("text/event-stream")
.sse();
// 再进行同步监听
sse.listen();
2
3
4
5
6
也可以通过EventSource
对象获得 SSE 控制器
ForestSSE sse;
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.setOnOpen(eventSource -> {
// SSE 开始监听时调用
sse = eventSource.sse();
})
.listen();
2
3
4
5
6
7
8
9
10
在获得到 SSE 控制器之后即可调用close()
方法关闭 SSE 事件流
sse.close(); // 手动关闭事件流,停止 SSE 监听
第二种方法是,调用EventSource
对象的close()
方法,效果相同
Forest.get("/sse")
.contentType("text/event-stream")
.sse()
.addOnData((eventSource, name, value) -> {
// 处理事件消息名称为 data 的事件
if ("close".equals(value)) {
eventSource.close(); // 手动关闭事件流,停止 SSE 监听
}
})
.listen();
2
3
4
5
6
7
8
9
10