Flow
更新: 3/27/2026 字数: 0 字 时长: 0 分钟
Flow是Kotlin用来处理连续不断的数据的解决方案,我们可以想象一个场景,客户端源源不断的向服务端发送数据,我们需要对这些源源不断的数据进行逐个的处理,而协程只能返回一个值,当协程还在计算的时候客户端再次发来一个值,我们没有办法很好的让协程在计算完之后再次接受新值进行进一步的计算
为了解决这一痛点,Kotlin设计了流/Flow,Flow专门用于接受一大批的数据并进行计算,流的核心机制是emit(发射)和 collect(收集),这里也可以理解成生产者和消费者,我们可以先来看一段简单的代码
fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}
fun main() = runBlocking<Unit> {
// 启动并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
simple().collect { value -> println(value) }
}最后的结果会是
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3我们使用emit发送一个值向流,然后流会记住这个值,并在collect的时候对这些值进行统一的处理,这里设计一个Kotlin流的核心概念:流是冷的
Kotlin流是冷流的实现,所谓冷流就是指流在接受到数据后不会立刻对数据进行处理,而是等待流的collect方法被调用来进行处理
需要特别注意的是,这里的处理并非说是类似list的forEach方法的处理,而是仍然是发送一个处理一个的处理方式,这里可以理解为在进行collect之前,流只是记住了接下来要处理的数据的内容,而在collect之后,才开始了真正的“发送一个,处理一个”的过程
这个过程的好处在于整个过程是异步执行的,他不会阻塞主线程的执行
SSE与Flow
这里我们需要知道SSE的原理,所谓SSE就是告诉浏览器:“这个HTTP连接先不要急着关闭,我还要继续往里面塞数据”
在这种情况下Flow就很适合了,我们以AI Chat(OpenAi官网)为例,我们向Ai发送一条消息本身就是建立了一个SSE连接,AI向我们回复的文字是一个一个文字出现的,这就是一个SSE连接,如果我们开启F12去看他网络请求,就会发现他的SSE中是一个一个的添加输出的文字,这和我们对Flow的理解一样,因此我们会使用Flow与SSE结合
SpringBoot使用Flow发送SSE请求
我们来看一段代码
package cn.cotenite.ssetest
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import java.io.IOException
@RestController
class SseFlowController {
// 自定义一个协程作用域,用于处理异步推送
private val sseScope = CoroutineScope(Dispatchers.IO )
@GetMapping("/api/sse/chat",produces = [MediaType.TEXT_EVENT_STREAM_VALUE + ";charset=UTF-8"])
fun handleChatStream(): SseEmitter {
// 1. 创建 Emitter,设置超时时间(0表示永不超时,取决于容器配置)
val emitter = SseEmitter(60_000L)
// 2. 准备你的数据流 (比如来自 LangChain4j 或 数据库)
val dataFlow: Flow<String> = flow {
emit("正在连接大模型...")
delay(500)
for (i in 1..5) {
delay(300)
emit("这是第 $i 个数据包")
}
emit("回答结束。")
}
// 3. 在协程中消费 Flow 并推送到 Emitter
sseScope.launch {
try {
dataFlow
.onCompletion {
// 流结束时关闭 Emitter
emitter.complete()
}
.collect { data ->
// 发送数据,注意捕获客户端断开连接的异常
try {
emitter.send(
SseEmitter.event()
.data(data)
.name("message")
)
} catch (e: IOException) {
// 客户端主动断开
cancel("Client disconnected")
}
}
} catch (e: Exception) {
emitter.completeWithError(e)
}
}
// 4. 注册回调:客户端断开或超时后的清理
emitter.onCompletion { /* 可以在这里写日志 */ }
emitter.onTimeout { emitter.complete() }
return emitter
}
}这里引入了
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.8.1")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")我们可以看到,Flow的准备阶段和发送发送阶段均为异步的,准备的时候是使用flow函数进行异步的生产,最后使用collect方法生成SseEmitter实例进行返回