# SSE技术
# 什么是SSE
==SSE(Server-Sent Events)==是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(==Event Stream==)。它基于 ==HTTP== 协议,利用了其==长连接特性==,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送。
# SSE技术的基本原理
- 客户端向服务器发送一个GET请求,带有指定的header,表示可以接收事件流类型,并禁用任何的事件缓存。
- 服务器返回一个响应,带有指定的header,表示事件的媒体类型和编码,以及使用分块传输编码(chunked)来流式传输动态生成的内容。
- 服务器在有数据更新时,向客户端发送一个或多个名称:值字段组成的事件,由单个换行符分隔。事件之间由两个换行符分隔。服务器可以发送事件数据、事件类型、事件ID和重试时间等字段。
- 客户端使用EventSource接口来创建一个对象,打开连接,并订阅onopen、onmessage和onerror等事件处理程序来处理连接状态和接收消息。
- 客户端可以使用GET查询参数来传递数据给服务器,也可以使用close方法来关闭连接。
# SSE和Socket的区别
SSE(Server-Sent Events)和 WebSocket 都是实现服务器向客户端实时推送数据的技术,但它们在某些方面还是有一定的区别。
# 技术实现
==SSE== 基于 HTTP 协议,利用了其长连接特性,通过浏览器向服务器发送一个 HTTP 请求,建立一条持久化的连接。而 WebSocket 则是通过特殊的升级协议(HTTP/1.1 Upgrade 或者 HTTP/2)建立新的 TCP 连接,与传统 HTTP 连接不同。
# 数据格式
==SSE== 可以传输==文本和二进制==格式的数据,但只支持==单向数据流==,即只能由服务器向客户端推送数据。==WebSocket== 支持==双向数据流==,客户端和服务器可以互相发送消息,并且没有==消息大小限制==。
# 连接状态
==SSE== 的连接状态仅有三种==:已连接、连接中、已断开==。连接状态是由浏览器自动维护的,客户端无法手动关闭或重新打开连接。而 ==WebSocket== 连接的状态更灵活,可以==手动打开、关闭、重连==等。
# 兼容性
==SSE== 是标准的 Web API,可以在大部分现代浏览器和移动设备上使用。但如果需要兼容老版本的浏览器(如 IE6/7/8),则需要使用 polyfill 库进行兼容。而 WebSocket 在一些老版本 Android 手机上可能存在兼容性问题,需要使用一些特殊的 API 进行处理。
# 安全性
SSE 的实现比较简单,都是基于 HTTP 协议的,与普通的 Web 应用没有太大差异,因此风险相对较低。WebSocket 则需要通过额外的安全措施(如 SSL/TLS 加密)来确保数据传输的安全性,避免被窃听和篡改,否则可能会带来安全隐患。
总体来说,SSE 和 WebSocket 都有各自的优缺点,适用于不同的场景和需求。如果只需要服务器向客户端单向推送数据,并且应用在前端的浏览器环境中,则 SSE 是一个更加轻量级、易于实现和维护的选择。而如果需要双向传输数据、支持自定义协议、或者在更加复杂的网络环境中应用,则 WebSocket 可能更加适合。
# SSE适用于场景
==SSE适用场景是指服务器向客户端实时推送数据的场景==,例如:
- ==股票价格更新==:服务器可以根据股市的变化,实时地将股票价格推送给客户端,让客户端能够及时了解股票的走势和行情。
- ==新闻实时推送==:服务器可以根据新闻的更新,实时地将新闻内容或标题推送给客户端,让客户端能够及时了解最新的新闻动态和信息。
- ==在线聊天==:服务器可以根据用户的发送,实时地将聊天消息推送给客户端,让客户端能够及时收到和回复消息。
- ==实时监控==:服务器可以根据设备的状态,实时地将监控数据或报警信息推送给客户端,让客户端能够及时了解设备的运行情况和异常情况。
SSE适用场景的特点是:
- ==数据更新频繁==:服务器需要不断地将最新的数据推送给客户端,保持数据的实时性和准确性。
- ==低延迟==:服务器需要尽快地将数据推送给客户端,避免数据的延迟和过期。
- ==单向通信==:服务器只需要向客户端推送数据,而不需要接收客户端的数据。
chatGPT 返回的数据 就是使用的SSE 技术
实时数据大屏 如果只是需要展示 实时的数据可以使用SSE技术 而不是非要使用webSocket
# API用法
==EventSource==这个api是一个用于接收服务器发送事件(Server-Sent Events,SSE)的Web API接口。服务器发送事件是一种让服务器端能够主动向客户端发送数据的技术,它使用HTTP协议,并且遵循一定的格式
要使用EventSource这个api,首先需要创建一个EventSource对象,并指定一个URL作为数据源。例如:
//创建一个EventSource对象,用于从sse.php页面接收事件
const evtSource = new EventSource("sse.php");
2
然后,需要为EventSource对象添加一些事件监听器,用于处理从服务器端接收到的数据。EventSource对象可以接收以下几种事件
- ==open==:当与服务器端的连接打开时触发。
- ==message==:当从服务器端接收到未命名的事件时触发。
- ==error==:当连接失败或关闭时触发。
- ==具名事件==:当从服务器端接收到指定了event字段的事件时触发,事件名称与event字段的值相同。
例如:
//为open事件添加一个监听器,打印连接状态
evtSource.addEventListener("open", (e) => {
console.log("Connection opened");
});
//为message事件添加一个监听器,打印数据内容
evtSource.addEventListener("message", (e) => {
console.log("Data: " + e.data);
});
//为error事件添加一个监听器,打印错误信息
evtSource.addEventListener("error", (e) => {
console.log("Error: " + e.message);
});
//为notice事件添加一个监听器,打印通知内容
evtSource.addEventListener("notice", (e) => {
console.log("Notice: " + e.data);
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
在服务器端,需要使用==text/event-stream作为响应的Content-Type==,并按照以下格式发送数据
event: <event name>
data: <data content>
id: <event id>
retry: <reconnection time>
2
3
4
其中,event字段是可选的,用于指定事件的名称;data字段是必须的,用于指定数据的内容;id字段是可选的,用于指定事件的标识符;retry字段是可选的,用于指定客户端在连接断开后重新连接的时间间隔(以毫秒为单位)。==每个字段都必须以换行符(\n)结尾==,并且==每个消息都必须以两个换行符(\n\n)结尾==。例如:
event: notice
data: Hello, world!
id: 1
2
3
# Chatgpt的类似DEMO
# 前端界面采用Vue
<template>
<div id="app">
<h1>简单的聊天应用程序</h1>
<div class="chat-box">
<div class="messages" ref="messages">
</div>
<div class="input-area">
<input type="text" v-model="input" @keyup.enter="sendMessage" placeholder="输入消息并按回车键发送" />
</div>
</div>
</div>
</template>
<script>
import axios from "axios";
export default {
name: "Chat",
data() {
return {
input: "",
messages:"",
eventSource: null,
};
},
mounted() {
this.initSSE();
},
beforeDestroy() {
this.eventSource.close();
},
methods: {
initSSE() {
// 创建一个SSE对象,连接到后端的/chat接口
this.eventSource = new EventSource("http://localhost:8081/chat");
// 监听message事件,接收后端发送的消息
this.eventSource.addEventListener("message", (event) => {
//将返回data插入元素
this.$refs.messages.innerHTML+= event.data
});
},
},
},
};
</script>
<style>
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
h1 {
text-align: center;
}
.chat-box {
width: 600px;
height: 400px;
margin: 20px auto;
border: 1px solid #ccc;
}
.messages {
height: 360px;
overflow-y: auto;
text-align: left;
}
.message {
padding: 10px;
}
.message.user {
text-align: right;
}
.message.user .content {
display: inline-block;
background-color: #f0f0f0;
border-radius: 10px;
}
.message.user .time {
display: block;
color: #999;
}
.message.bing {
text-align: left;
}
.message.bing .content {
display: inline-block;
background-color: #e6f7ff;
border-radius: 10px;
}
.message.bing .time {
display: block;
color: #999;
}
.input-area {
height: 40px;
}
.input-area input {
width: calc(100% - 20px);
height: calc(100% - 10px);
margin: 5px auto;
}
</style>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# 后端使用SpringBoot
import org.reactivestreams.Publisher;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.*;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.function.Function;
import java.util.function.Supplier;
@Controller
public class ChatController {
// 定义一个常量字符串,作为要发送的数据
private static final String SAY="千万刀锋之力,万丈烈焰之怒在我心中鼓荡。 我不惧怕圣火,我必须以身为信。 我携来光的怒火。 星辰间的国度在召唤我。 忤逆者。 我诞生于灼烧罪人的火焰中。 归顺光明,我就饶恕你的灵魂。 不留情面。 你是会说话的动物还是个小矮人?为什么你这么软乎。 他们将在我的铁翼面前溃败。 邪恶惧怕火焰,而约德尔人他们到底是什么东西。 忍受。 我携来烈怒之光。 你的剑在我手上你想要回去吗? 我曾望进母亲的眼睛,看到一处充满荣耀与正义的圣地,我正是为此而战这个世界不配有你。 今天,似乎我们都要暂时放下成见。 你的罪赎清了。 正义不死。 莫甘娜,你的能力本可拯救世界,可惜它却毁了你。 我要是摔倒了就会堕落,因此我必须飞翔。 折磨生出苦难,苦难又会加剧折磨,凡间这无穷的循环将由我来终结光芒给予你救赎。你该当此罪。";
// 处理GET请求,返回一个SSE对象,用于向前端发送消息
// 使用@CrossOrigin注解允许跨域请求
// 使用@GetMapping注解指定请求路径和返回类型
@CrossOrigin
@GetMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public @ResponseBody Flux<String> chat() {
// 使用Flux.usingWhen方法创建一个响应式流对象
// 第一个参数是一个Mono对象,用于提供资源(即Sinks.Many对象)
// 第二个参数是一个函数对象,用于根据资源生成数据流(即字符串的每个字符)
// 第三个参数是一个函数对象,用于释放资源(即关闭Sinks.Many对象)
return Flux.usingWhen(
// 使用Mono.fromSupplier方法创建一个Mono对象,用于提供资源
// 使用Supplier接口实现get方法,返回一个Sinks.Many对象
Mono.fromSupplier(new Supplier<Sinks.Many<String>>() {
@Override
public Sinks.Many<String> get() {
// 使用Sinks.many().unicast().onBackpressureBuffer()方法创建一个Sinks.Many对象
// 这个对象可以向多个订阅者发送数据,并缓存未消费的数据
return Sinks.many().unicast().onBackpressureBuffer();
}
}),
// 使用Function接口实现apply方法,根据资源生成数据流
new Function<Sinks.Many<String>, Flux<String>>() {
@Override
public Flux<String> apply(Sinks.Many<String> sink) {
// 使用sink.asFlux()方法获取数据流对象,并与另一个数据流合并
// 另一个数据流使用Flux.interval方法创建,每隔100毫秒生成一个序号
// 使用takeWhile方法限制序号的范围,不能超过字符串的长度
// 使用map方法将序号映射为字符串的对应字符,并加上换行符
return sink.asFlux().mergeWith(Flux.interval(Duration.ofMillis(100))
.takeWhile(seq -> seq.intValue() < SAY.length())
.map(seq -> SAY.charAt(seq.intValue()) + "\n\n"));
}
},
// 使用Function接口实现apply方法,释放资源
new Function<Sinks.Many<String>, Publisher<Void>>() {
@Override
public Publisher<Void> apply(Sinks.Many<String> sink) {
// 使用Mono.fromRunnable方法创建一个Publisher对象,用于执行一个任务
// 使用Runnable接口实现run方法,调用sink.tryEmitComplete()方法关闭Sinks.Many对象
return Mono.fromRunnable(new Runnable() {
@Override
public void run() {
sink.tryEmitComplete();
}
});
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
这段代码定义了一个名为 ChatController
的类,它使用 @Controller
注解来标记为一个控制器类。这个类包含一个名为 chat()
的方法,它使用 @GetMapping
注解来处理 GET 请求,并返回一个 Flux String 对象。这个方法的主要目的是向前端发送消息。
在 chat()
方法中,代码使用了 Flux.usingWhen()
方法来创建一个 Flux
对象。这个方法接受三个参数:resourceSupplier
、fluxFunction
和 resourceCleanup
。
resourceSupplier
参数是一个Mono
对象,它用于提供一个资源。在这段代码中,资源是一个Sinks.Many<String>
对象,它用于向前端发送消息。fluxFunction
参数是一个函数,它接受一个资源作为输入,并返回一个Flux
对象。在这段代码中,这个函数使用了两个操作符:asFlux()
和mergeWith()
。asFlux()
操作符用于将sink
转换为一个Flux
对象。mergeWith()
操作符用于将两个Flux
对象合并在一起。在这段代码中,它将sink.asFlux()
和另一个Flux
对象合并在一起。这个另一个
Flux
对象使用了三个操作符:interval()
、takeWhile()
和map()
。interval()
操作符用于创建一个每隔一定时间间隔发出序列号的Flux
对象。takeWhile()
操作符用于只取满足条件的元素。map()
操作符用于将序列号转换为字符串。
resourceCleanup
参数是一个函数,它接受一个资源作为输入,并返回一个Mono Void
对象。在这段代码中,这个函数调用了sink::tryEmitComplete
方法来关闭 `
chat()还有一种实现方式,但是Sink无法关闭且没有信息发送时空载,造成资源浪费
Sinks.Many<String> sink=Sinks.many().unicast().onBackpressureBuffer();
Flux<String> flux= sink.asFlux();
Flux.interval(Duration.ofMillis(100)).takeWhile(seq->seq.intValue()<SAY.length()).subscribe((seq)->{
sink.tryEmitNext(SAY.charAt(seq.intValue()) + "\n\n");
});
return flux;
2
3
4
5
6
7
# 最终实现效果
# 简单介绍Flux和Sink
# Flux
==Flux==类是一个标准的==Publisher==,它表示0到N个发射项的异步序列,可选地以完成信号或错误终止。以下是Flux类中一些常用的方法及其示例代码:
- ==just()==: 可以指定序列中包含的全部元素。创建出来的Flux序列在发布这些元素之后会自动结束。
Flux.just("Hello", "World").subscribe(System.out::println);
- ==fromArray(), fromIterable() 和 fromStream()==: 可以从一个数组、Iterable对象或Stream对象中创建Flux对象。
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
==empty()==: 创建一个不包含任何元素,只发布结束消息的序列。
Flux.empty().subscribe(System.out::println);
- ==error(Throwable error)==: 创建一个只包含错误消息的序列。
==Flux.interval== 是一个静态方法,它返回一个 ==Flux Long==,这个 Flux 在初始延迟后,周期性地发出从 0 开始递增的长整数。这个方法有三个重载版本,它们的参数如下:
- ==Flux.interval(Duration period)==:只需要一个 Duration 类型的参数,表示发出元素的时间间隔。这个方法没有初始延迟,也就是说,它会立即发出第一个元素 0。它使用 Schedulers.parallel() 调度器来执行。
- ==Flux.interval(Duration delay, Duration period)==:需要两个 Duration 类型的参数,第一个表示初始延迟,第二个表示发出元素的时间间隔。这个方法会在 delay 时间后发出第一个元素 0。它也使用 Schedulers.parallel() 调度器来执行。
- ==Flux.interval(Duration delay, Duration period, Scheduler timer)==:需要三个参数,前两个和上面一样,第三个是一个 Scheduler 类型的参数,表示用于执行的调度器。这个方法可以自定义调度器,比如 Schedulers.single() 或 Schedulers.elastic()。
下面是一些示例代码:
// 使用 Flux.interval(Duration period) 方法
Flux.interval(Duration.ofSeconds(1)) // 每隔一秒发出一个元素
.subscribe(System.out::println); // 打印输出
// 使用 Flux.interval(Duration delay, Duration period) 方法
Flux.interval(Duration.ofSeconds(2), Duration.ofSeconds(1)) // 延迟两秒后开始每隔一秒发出一个元素
.subscribe(System.out::println); // 打印输出
// 使用 Flux.interval(Duration delay, Duration period, Scheduler timer) 方法
Flux.interval(Duration.ofSeconds(2), Duration.ofSeconds(1), Schedulers.single()) // 延迟两秒后开始每隔一秒发出一个元素,并使用单线程调度器
.subscribe(System.out::println); // 打印输出
2
3
4
5
6
7
8
9
10
11
==Flux.interval.takeWhile== 是一个实例方法,它返回一个 Flux T ,这个 Flux 只发出源 Flux 中满足给定谓词的元素,直到遇到第一个不满足的元素为止。这个方法有两个重载版本,它们的参数如下:
- ==Flux T takeWhile(Predicate ? super T predicate)==:只需要一个 Predicate 类型的参数,表示用于判断元素是否满足条件的函数。
- ==Flux T takeWhile(@Nullable T value)==:需要一个 T 类型的参数,表示用于判断元素是否等于该值的常量。
下面是一些示例代码:
// 使用 takeWhile(Predicate<? super T> predicate) 方法
Flux.interval(Duration.ofSeconds(1)) // 每隔一秒发出一个元素
.takeWhile(x -> x < 5) // 只发出小于 5 的元素
.subscribe(System.out::println); // 打印输出
// 使用 takeWhile(@Nullable T value) 方法
Flux.just("a", "b", "c", "d", "e") // 发出五个字母
.takeWhile("c") // 只发出等于 "c" 的字母
.subscribe(System.out::println); // 打印输出
2
3
4
5
6
7
8
9
除了 takeWhile 方法,Flux 还有其他一些带有 take 的方法,它们的作用和用法如下:
- ==Flux T take(long n)==:返回一个 Flux T,这个 Flux 只发出源 Flux 中的前 n 个元素,如果源 Flux 元素少于 n 个,则发出所有元素。
- ==Flux T take(Duration timespan)==:返回一个 Flux T,这个 Flux 只发出源 Flux 中在给定时间段内的元素。
- ==Flux T take(Duration timespan, Scheduler scheduler)==:返回一个 Flux T,这个 Flux 只发出源 Flux 中在给定时间段内的元素,并使用指定的调度器。
- ==Flux T takeLast(int n)==:返回一个 Flux T,这个 Flux 只发出源 Flux 中的后 n 个元素,如果源 Flux 元素少于 n 个,则发出所有元素。
- ==Flux T takeUntil(Predicate ? super T predicate)==:返回一个 Flux T,这个 Flux 发出源 Flux 中的所有元素,直到遇到第一个满足给定谓词的元素为止(包括该元素)。
- ==Flux T takeUntilOther(Publisher ? other)==:返回一个 Flux T ,这个 Flux 发出源 Flux 中的所有元素,直到另一个发布者发出信号为止(不包括该信号)。
# Sinks
Sinks类是一个接口,它提供了一种方法来安全地将数据和信号发送到下游订阅者。Sinks类中较为常用的方法有:
- ==
tryEmitNext(T t)
==: 尝试发射下一个数据元素。如果成功,则返回Sinks.EmitResult.OK
,否则返回其他结果。 - ==
tryEmitComplete()
==: 尝试发射完成信号。如果成功,则返回Sinks.EmitResult.OK
,否则返回其他结果。 - ==
tryEmitError(Throwable error)
==: 尝试发射错误信号。如果成功,则返回Sinks.EmitResult.OK
,否则返回其他结果。
==Sinks.many().unicast().onBackpressureBuffer()== 是一个链式方法,它返回一个 ==Sinks.Many T==,这个 Sinks.Many T 可以用于创建一个单播的 Flux T,这个 Flux T 可以处理背压的情况。这个方法的各个参数的意义如下:
- ==Sinks.many()==:这是一个静态方法,它返回一个 Sinks.ManySpec,这个 Sinks.ManySpec 可以用于创建不同类型的 Sinks.Many T。
- ==unicast()==:这是一个实例方法,它返回一个 Sinks.ManySpec.UnicastSpec,这个 Sinks.ManySpec.UnicastSpec 可以用于创建一个单播的 Sinks.Many T,也就是说,这个 Sinks.Many T 只能有一个订阅者。
- ==onBackpressureBuffer()==:这是一个实例方法,它返回一个 Sinks.Many T,这个 Sinks.Many T 可以处理背压的情况,也就是说,当订阅者不能及时消费元素时,它会把元素缓存起来,直到订阅者可以消费或者缓存满了为止。
下面是一些示例代码:
// 创建一个单播的 Sinks.Many<String>
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
// 创建一个 Flux<String>,它从 sink 中获取元素
Flux<String> flux = sink.asFlux();
// 订阅 flux,并打印输出
flux.subscribe(System.out::println);
// 向 sink 中发出元素
sink.emitNext("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitNext("World", Sinks.EmitFailureHandler.FAIL_FAST);
sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
2
3
4
5
6
7
8
9
10
11
12
13
==Sinks.many().unicast().onBackpressureBuffer()==
- ==onBackpressureBuffer(int maxSize)==:这个方法可以指定缓存的最大容量,如果缓存满了,它会抛出一个异常。
- ==onBackpressureBuffer(int maxSize, Consumer ? super T onOverflow)==:这个方法除了可以指定缓存的最大容量,还可以指定一个 Consumer 类型的参数,表示当缓存溢出时要执行的操作。
- ==onBackpressureBuffer(Queue T queue)==:这个方法可以指定一个 Queue 类型的参数,表示用于缓存元素的队列,它可以自定义队列的实现方式和容量。
- ==onBackpressureBuffer(Queue T queue, Consumer ? super T onOverflow)==:这个方法除了可以指定一个 Queue 类型的参数,还可以指定一个 Consumer 类型的参数,表示当缓存溢出时要执行的操作。
下面是一些示例代码:
// 创建一个单播的 Sinks.Many<String>
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
// 使用 onBackpressureBuffer(int maxSize) 方法
Sinks.Many<String> sink1 = sink.onBackpressureBuffer(10); // 指定缓存最大为 10 个元素
// 使用 onBackpressureBuffer(int maxSize, Consumer<? super T> onOverflow) 方法
Sinks.Many<String> sink2 = sink.onBackpressureBuffer(10, x -> System.out.println("Overflow: " + x)); // 指定缓存最大为 10 个元素,并在溢出时打印输出
// 使用 onBackpressureBuffer(Queue<T> queue) 方法
Sinks.Many<String> sink3 = sink.onBackpressureBuffer(new ArrayBlockingQueue<>(10)); // 指定使用一个数组阻塞队列来缓存元素,容量为 10
// 使用 onBackpressureBuffer(Queue<T> queue, Consumer<? super T> onOverflow) 方法
Sinks.Many<String> sink4 = sink.onBackpressureBuffer(new ArrayBlockingQueue<>(10), x -> System.out.println("Overflow: " + x)); // 指定使用一个数组阻塞队列来缓存元素,容量为 10,并在溢出时打印输出
2
3
4
5
6
7
8
9
10
11
12
13
14
==unicast()==
- ==multicast()==:这个方法可以创建一个多播的 Sinks.Many T,也就是说,这个 Sinks.Many T 可以有多个订阅者,它会把发出的元素广播给所有的订阅者。
- ==replay()==:这个方法可以创建一个重放的 Sinks.Many T,也就是说,这个 Sinks.Many T 会缓存发出的元素,并在新的订阅者订阅时重放给它们。
- ==directBestEffort()==:这个方法可以创建一个直接的 Sinks.Many T,也就是说,这个 Sinks.Many T 会尽最大努力把发出的元素直接传递给订阅者,而不进行缓存或背压处理。
下面是一些示例代码:
// 创建一个多播的 Sinks.Many<String>
Sinks.Many<String> sink1 = Sinks.many().multicast().onBackpressureBuffer();
// 创建一个重放的 Sinks.Many<String>
Sinks.Many<String> sink2 = Sinks.many().replay().all();
// 创建一个直接的 Sinks.Many<String>
Sinks.Many<String> sink3 = Sinks.many().directBestEffort();
// 创建一个 Flux<String>,它从 sink1 中获取元素
Flux<String> flux1 = sink1.asFlux();
// 创建一个 Flux<String>,它从 sink2 中获取元素
Flux<String> flux2 = sink2.asFlux();
// 创建一个 Flux<String>,它从 sink3 中获取元素
Flux<String> flux3 = sink3.asFlux();
// 订阅 flux1,并打印输出
flux1.subscribe(System.out::println);
// 订阅 flux2,并打印输出
flux2.subscribe(System.out::println);
// 订阅 flux3,并打印输出
flux3.subscribe(System.out::println);
// 向 sink1 中发出元素
sink1.emitNext("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
sink1.emitNext("World", Sinks.EmitFailureHandler.FAIL_FAST);
sink1.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
// 向 sink2 中发出元素
sink2.emitNext("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
sink2.emitNext("World", Sinks.EmitFailureHandler.FAIL_FAST);
sink2.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
// 向 sink3 中发出元素
sink3.emitNext("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
sink3.emitNext("World", Sinks.EmitFailureHandler.FAIL_FAST);
sink3.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1l377jqrmofjd