🐿 声明式 SSE 接口
# SSE
SSE(Server-Sent Events),也被称作“事件流”(Event Stream),是一种旨在通过HTTP协议来实现服务器主动向客户端推送数据的技术手段。 它可以在客户端与服务器之间建立一条持久化的HTTP长连接,并通过这条连接实现服务器向客户端的实时数据推送,但客户端不能通过 SSE 向服务端发送数据。
自v1.6.0
版本起,Forest 提供了对 SSE 的支持,并且支持声名式
和编程式
两种 SSE 请求方式。
# 声明式 SSE 接口
# 接口定义
SSE 的声名式接口定义方式和普通 HTTP 接口定义方式类型,只需要将ForestSSE
类作为接口方法的返回值类型即可,调用该方法后即可返回ForestSSE
类的对象实例。
ForestSSE
是 Forest 的 SSE 控制器类,通过此类对象可以进行监听、添加消息回调函数等操作。
public interface SSEClient {
// ForestSSE 为 Forest 的 SSE 控制器类
// 只要以此类作为方法的返回值类型,该方法就自动成为 SSE 请求方法
@Get(url = "/sse", contentType = "text/event-stream")
ForestSSE testSSE();
}
2
3
4
5
6
7
# 监听事件
调用接口方法后会返回ForestSSE
类对象,此时变化把普通的 HTTP 请求转换为 SSE 控制器,但不会发送任何实际的网络请求。
sseClient.testSSE(); // 把普通的 HTTP 请求转换为 SSE 控制器,但不会发送任何实际的网络请求
得到 SSE 控制器之后,便可调用listen()
方法开始监听 SSE 的事件流,并阻塞当前调用的线程。
sseClient.testSSE().listen(); // 开始监听 SSE 事件流,并阻塞当前线程
# 异步监听
ForestSSE 控制器中的listen()
方法是一种同步的监听方法,另外 Forest 也提供了异步的监听方法:asyncListen()
sseClient.testSSE().asyncListen(); // 开始异步监听 SSE 事件流,并且不会阻塞当前线程
ForestSSE 类还提供了await()
方法,用于阻塞等待当前线程,直到异步监听结束为止。
ForestSSE sse = sseClient.testSSE().asyncListen(); // 开始异步监听 SSE 事件流
sse.await(); // 阻塞并等待异步监听结束
2
# 行模式
通常情况下,SSE 的消息流有一套标准的格式,一组消息分多行,每一行都是name:value
形式的键值对字符串,每组消息之间用空白行隔开,具体形式如下:
id:1
event:user
data:{"name":"张三"}
id:2
event:user
data:{"name":"李四"}
id:3
event:user
data:{"name":"王五"}
2
3
4
5
6
7
8
9
10
11
这种格式的消息,可以通过制定行模式为多行模式MULTI_LINES
来读取解析
sseClient.testSSE()
.setOnMessage(event -> { // 处理多行模式的消息
event.id(); // 消息名为 id 的值,这里应得到 1, 2, ...
event.event(); // user
event.data(); // {"name": "张三"}, ...
})
.listen(SSELinesMode.MULTI_LINES);
2
3
4
5
6
7
SSE 的消息除了有标准格式,还有很多非标准的格式,比如每行都是一条JSON字符串,每一行都是单独的消息,它们之间没有空白行分割
{"name":"a"}
{"name":"b"}
{"name":"c"}
2
3
对于这种类型的消息,就要使用单行模式(SINGLE_LINE)
sseClient.testSSE()
.setOnMessage(event -> { // 处理单行模式的消息
String str = event.value(); // 获取字符串类型的消息值
MyUser user = event.value(MyUser.class); // 获取消息值并转换为自定义类型
})
.listen(SSELinesMode.SINGLE_LINE);
2
3
4
5
6
listen
方法不传参数的情况下,默认为AUTO,AUTO模式会自动识别需要采用的行模式
// AUTO 模式会自动识别需要采用的行模式
sseClient.testSSE().listen(); // 默认行模式为 AUTO
2
# 事件处理
在上面的例子中,我们可以通过listen()
和asyncListen()
方法进行事件监听,但若在此时接受到实际事件消息,并不会进行任何处理。
如要针对不同事件内容,进行响应的处理,就需要选择一种事件处理方式来处理接受到的事件。
在 Forest 中事件处理的方式也有三种:SSE 回调函数、自定义 SSE 控制器、以及 SSE 拦截器。
# SSE 回调函数
SSE 回调函数来处理事件的方式最为简便和直接,在调用接口返回 ForestSSE 对象后,即可调用setOnMessage
方法,来设置处 SSE 的事件消息
sseClient.testSSE() // 调用接口方法后返回 ForestSSE 对象
.setOnOpen(event -> {
// SSE 开始监听时调用
// event 为 EventSource 事件源对象
event.request(); // 获取 Forest 请求对象
event.response(); // 获取 Forest 响应对象
})
.setOnClose(event -> {
// SSE 结束监听时调用
})
.setOnMessage(event -> {
event.id(); // 消息名为 id 的值
event.event(); // 消息名为 event 的值
event.data(); // 消息名为 data 的值
event.value("text"); // 获取非标准名称的消息值,如: text
})
.listen(); // 开始监听 SSE 事件流,默认AUTO模式
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
除了基本的消息监听处理功能之外,也可以调用setOnOpen
、setOnClose
、addOnData
、addOnEvent
等方法来设置处理 SSE 不同生命周期、不同类型事件的回调函数
sseClient.testSSE() // 调用接口方法后返回 ForestSSE 对象
.setOnOpen(event -> {
// SSE 开始监听时调用
// event 为 EventSource 事件源对象
event.request(); // 获取 Forest 请求对象
event.response(); // 获取 Forest 响应对象
})
.setOnClose(event -> {
// SSE 结束监听时调用
})
.setOnMessage(event -> { // 处理每一组 SSE 消息
event.id(); // 消息名为 id 的值
event.event(); // 消息名为 event 的值
event.data(); // 消息名为 data 的值
event.value("text"); // 获取非标准名称的消息值,如: text
})
.addOnData((event, name, value) -> {
// 处理事件消息名称为 data 的事件
// name 为事件消息名称 (同 eventSource.name())
// value 为事件消息的值
event.sse(); // 获取 SSE 控制器
event.request(); // 获取 Forest 请求对象
event.response(); // 获取 Forest 响应对象
event.rawData(); // 获取事件消息的原始数据
event.name(); // 获取事件消息的名称,如 data、event、id 等
event.value(); // 获取事件消息的值,返回字符串类型数据
// 获取事件消息的值,并将消息转换为指定的自定义类型
event.value(MyUser.class);
// 获取事件消息的值,并将消息转换为指定的自定义泛型类型
event.value(new TypeReference<List<MyUser>>() {});
})
.addOnEvent((event, name, value) -> {
// 处理事件消息名称为 event 的事件
// 参数内容与上面的 addOnData 回调函数参数相同
})
.addOnId((event, name, value) -> {
// 处理事件消息名称为 id 的事件
// 参数内容与上面的 addOnData 或 addOnEvent 回调函数参数相同
})
.addOnRetry((event, name, value) -> {
// 处理事件消息名称为 retry 的事件
// 参数内容与上面的几个回调函数参数相同
})
.listen(); // 开始监听 SSE 事件流
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
addOnData
、addOnEvent
以及addOnId
等函数都是监听内置固定消息名称的方法,也可以调用addConsumer
方法来处理非固定的,通过字符串参数指定的消息名称的事件
sseClient.testSSE() // 调用接口方法后返回 ForestSSE 对象
.addConsumer("data", (event, name, value) -> {
// 通过字符串参数指定要监听的消息名称
// 第一个参数传入 "data",就会监听消息名称为 data 的事件
// 参数内容与上面的 addOnData、addOnId 等回调函数参数一致
})
.addConsumer("event", (event, name, value) -> {
// 通过字符串参数,来指定要监听消息名为 event 的事件
})
.addConsumer("push", (event, name, value) -> {
// 当然也可以指定非标准的消息名称
})
.listen(); // 开始监听 SSE 事件流
2
3
4
5
6
7
8
9
10
11
12
13
# 自定义 SSE 控制器
如果不想在调用 SSE 接口的地方写一大堆 SSE 事件的处理代码,想把事件监听和接受到事件后的事件处理逻辑代码进行解耦,独立到另一个地方的话,可以使用 自定义 SSE 控制器
// 自定义 SSE 控制器类,需要继承 ForestSSE
public class MySSEHandler extends ForestSSE {
@Override
protected void onOpen(EventSource event) {
// SSE 开始监听时调用
System.out.println("onOpen");
}
@Override
protected void onClose(EventSource event) {
// SSE 结束监听时调用
System.out.println("onClose");
}
// 1.6.4+ 版本可以重写 onMessage 方法来接收 SSE 消息
@Override
public void onMessage(EventSource event) {
event.id(); // 获取消息名称为 id 的消息值
event.event(); // 获取消息名称为 event 的消息值
event.data(); // 获取消息名称为 data 的消息值
event.retry(); // 获取消息名称为 retry 的消息值
event.value("text"); // 获取非标准消息名称为 text 的消息值
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
然后将接口方法中的返回类型改成自定义的 SSE 控制器类即可
public interface SSEClient {
// 返回类型改成自定义的 SSE 控制器类
@Get(url = "/sse", contentType = "text/event-stream")
MySSEHandler testSSE();
}
2
3
4
5
6
完成这一步后,在调用时就无需再写回调函数了
// 开始监听后,收到消息时会自动执行 MySSEHandler 中的事件处理方法
sseClient.testSSE().listen();
2
关于具体如何实现自定义 SSE 控制器,请参见《自定义 SSE 控制器》
# SSE 拦截器
除了回调函数和自定义 SSE 控制器两种方式外,还可以使用 SSE 拦截器来实现 SSE 事件的处理
相比于 SSE 控制器,SSE 拦截器为单例对象,不会在每次发起 SSE 请求时创建一个实例,并且可以注入到 Spring 上下文中,可调用 Spring 上下文中的对象和资源
// 自定义 SSE 拦截器
// 一定要加上 @Component 注解,不加上就不能注入到 Spring 上下文
@Component
public class MySSEInterceptor implements SSEInterceptor {
// 可以注入 Spring 上下文中的对象
@Resource
private MyUserService myUserService;
@Override
public void onSSEOpen(EventSource event) {
// SSE 开始监听时调用
System.out.println("onOpen");
}
@Override
public void onSSEClose(EventSource event) {
// SSE 结束监听时调用
System.out.println("onClose");
}
// 1.6.4+ 版本可以重写 onMessage 方法来接收 SSE 消息
@Override
public void onMessage(EventSource event) {
event.id(); // 获取消息名称为 id 的消息值
event.event(); // 获取消息名称为 event 的消息值
event.data(); // 获取消息名称为 data 的消息值
event.retry(); // 获取消息名称为 retry 的消息值
event.value("text"); // 获取非标准消息名称为 text 的消息值
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
定义完 SSE 拦截器之后,只要像普通拦截器那样绑定到接口方法上接口
public interface SSEClient {
// 将自定义 SSE 拦截器绑定到该接口方法上
// 返回类型仍然为 ForestSSE
@Get(
url = "/sse",
contentType = "text/event-stream",
interceptor = MySSEInterceptor.class
)
ForestSSE testSSE();
}
2
3
4
5
6
7
8
9
10
11
完成这一步后,在调用时就无需再写回调函数了
// 开始监听后,收到消息时会自动执行 MySSEInterceptor 拦截器中的事件处理方法
sseClient.testSSE().listen();
2
关于 SSE 拦截器的详情,请参见《SSE 拦截器》
# 关闭 SSE 事件流
可以调用 SSE 控制器及其子类(也就是自定义 SSE 控制器类)的close()
方法
使用此方法需要先获得 SSE 控制器对象
// 通过异步监听获得 SSE 控制器对象
ForestSSE sse = sseClient.testSSE().asyncListen();
2
或是先获得 SSE 控制器,再进行同步监听
// 先获得 SSE 控制器对象
ForestSSE sse = sseClient.testSSE();
// 再进行同步监听
sse.listen();
2
3
4
也可以通过EventSource
对象获得 SSE 控制器
ForestSSE sse;
sseClient.testSSE()
.setOnOpen(event -> {
// SSE 开始监听时调用
sse = event.sse();
});
2
3
4
5
6
7
在获得到 SSE 控制器之后即可调用close()
方法关闭 SSE 事件流
sse.close(); // 手动关闭事件流,停止 SSE 监听
第二种方法是,调用EventSource
对象的close()
方法,效果相同
sseClient.testSSE()
.addOnData((event, name, value) -> {
// 处理事件消息名称为 data 的事件
if ("close".equals(value)) {
event.close(); // 手动关闭事件流,停止 SSE 监听
}
})
.listen();
2
3
4
5
6
7
8