在协程中,通过调用操作符shareIn与stateIn,可以将一个冷流转换成一个热流,这两个方法的区别如下:
- shareIn:将一个冷流转换成一个标准的热流——SharedFlow类型的对象。
- stateIn:将一个冷流转换成一个单数据更新的热流——StateFlow类型的对象。
shareIn方法与stateIn方法的使用与实现的原理类似,下面以shareIn方法为例进行分析。
一.shareIn方法的使用
shareIn方法用于将一个冷流转换成一个热流对象,并在指定的协程作用域中根据不同的启动终止策略启动热流,将上游发射的数据发射给下游的多个订阅者,代码如下:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
...
}
shareIn方法共有三个参数,含义如下:
- scope:表示热流启动的协程作用域。
- started:热流的启动终止策略,共三种:
- Eagerly:立刻启动,并且不会终止。
- Lazily:当第一个订阅者出现时启动,并且不会终止。
- WhileSubscribed:默认情况下,当第一个订阅者出现时启动,当最后一个订阅者消失时终止,并保留replayCache中的数据。
- replay:SharedFlow中replayCache的最大容量,必须大于等于零。
1.典型场景的使用
shareIn方法用于当创建或获取一个冷流成本较高,同时还有多个订阅者需要获取冷流发射的数据的场景。比如:通过网络连接获取数据,IO读取文件等,会消耗大量的时间和设备资源,这时就可以使用shareIn方法,代码如下:
private suspend fun test() {
val flow = flow {
// 网络连接
connectToNet()
try {
while (true) {
// 获取数据并发射
emit(getDataFromNet())
}
} finally {
// 断开连接
disconnectFromNet()
}
}
// 不使用shareIn方法
// 10次网络连接
for (i in 0..10)
launch {
flow.collect {
Log.d("liduo", "test: $it")
}
}
// 使用shareIn方法
val sharedFlow = flow.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
// 1次网络连接
for (i in 0..10)
launch {
sharedFlow.collect {
Log.d("liduo", "test1: $it")
}
}
}
2.搭配操作符的使用
1)感知上游流的结束
当上游流正常执行完毕结束时,订阅者无法感知,因为热流不会结束。如果需要通知订阅者上游的流执行完成,可以在shareIn操作符前使用onCompletion操作符,代码如下:
val sharedFlow = flow {
// 向下游发射100
emit(100)
// 向下游发射200
emit(200)
}.onCompletion {
// 如果不是由于异常而结束
if (it == null)
// 发射0
emit(0)
else // 如果是因为异常结束,则发射-1
emit(-1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
2)感知上游流的异常
当上游的流发生异常导致热流取消时,会直接被热流所在的协程作用域处理,因此订阅者是没有感知的。如果需要通知订阅者,可以在shareIn操作符前使用catch或retry操作符,代码如下:
val sharedFlow = flow {
// 向下游发射100
emit(100)
// 向下游发射200
emit(200)
}.catch {
// 发生异常,发射-1通知下游
emit(-1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
3)感知上游流的启动
当上游流启动时,订阅者是无法感知的。如果需要通知订阅者,可以在shareIn操作符前使用onStart操作符,代码如下:
val sharedFlow = flow {
// 向下游发射100
emit(100)
// 向下游发射200
emit(200)
}.onStart {
// 向下游发射1
emit(1)
}.shareIn(GlobalScope, SharingStarted.Eagerly, 1)
4)显示指定缓存最大容量
shareIn操作符启动的热流在独立的协程中运行,并且热流中缓存数组的buffered values的最大容量为replay或CHANNEL_DEFAULT_CAPACITY中较大的。如果需要显示的指定buffered values的最大容量,可以在shareIn操作符前使用buffer或conflate操作符,使用规则如下:
- buffer(0).shareIn(scope, started, 0):replay = 0,extraBufferCapacity = 0,没有缓存。
- buffer(b).shareIn(scope, started, r):replay = r,extraBufferCapacity = b。
- conflate().shareIn(scope, started, r):replay = r,onBufferOverflow = DROP_OLDEST,如果r等于0,则extraBufferCapacity = 1。
二.热流的启动终止策略
在前面的介绍中,提到在shareIn方法中,热流的启动终止策略定义在接口SharingStarted中,代码如下:
public interface SharingStarted {
public companion object {
// 立刻启动,并且不会终止
public val Eagerly: SharingStarted = StartedEagerly()
// 当第一个订阅者出现时启动,并且不会终止
public val Lazily: SharingStarted = StartedLazily()
// 默认情况下,当第一个订阅者出现时启动,
// 当最后一个订阅者消失时终止,并保留replayCache中的数据
// 参数stopTimeoutMillis表示当最后一个订阅者消失后多长时间终止,默认为0——立刻终止
// 参数replayExpirationMillis表示在终止后多长时间去清除replayCache,默认为Int最大值——不清除
@Suppress("FunctionName")
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
}
// 核心接口方法
public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}
热流的三个启动终止策略分别对应StartedEagerly、StartedLazily、StartedWhileSubscribed这三个类的对象。除了三个启动终止策略外,接口中还定义了一个核心方法command,用于将SharedFlow类型对象的全局变量subscriptionCount,转换为泛型SharingCommand的Flow类型对象,实际上就是通过监听订阅者数量的变化来发出不同的控制指令。
StartedEagerly、StartedLazily、StartedWhileSubscribed这三个类都实现了SharingStarted接口,并重写了command方法。如果我们需要自定义一个新的启动终止策略,也可以通过实现SharingStarted接口重写command方法来完成。
1.热流的控制指令
SharingCommand类是一个枚举类,定义了控制热流的指令,代码如下:
public enum class SharingCommand {
// 启动热流,并触发上游流的执行
START,
// 终止热流,并取消上游流的执行
STOP,
// 终止热流,并取消上游流的执行,同时将replayCache重置为初始状态
// 如果热流的类型为StateFlow,则将replayCache重置为初始值
// 如果热流的类型为SharedFlow,则调用resetReplayCache方法,清空replayCache
STOP_AND_RESET_REPLAY_CACHE
}
连续发射相同的指令不会有任何作用。先发射STOP指令,再发射START指令,可以触发热流的重启,并重新触发上游流的执行。
2.Eagerly策略的实现
Eagerly策略表示立刻启动热流,并且不会终止,由StartedEagerly类实现,代码如下:
private class StartedEagerly : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
flowOf(SharingCommand.START)
override fun toString(): String = "SharingStarted.Eagerly"
}
...
public fun <T> flowOf(value: T): Flow<T> = flow {
emit(value)
}
Eagerly策略不关心订阅者的数量,在触发后直接向下游发射START指令。
3.Lazily策略的实现
Lazily策略表示当第一个订阅者出现时启动热流,并且不会终止,由StartedLazily类实现,代码如下:
private class StartedLazily : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = flow {
// 标志位,默认为false
var started = false
// 监听订阅者数量的变化
subscriptionCount.collect { count ->
// 如果订阅者数量大于0,且之前没有发射过指令
if (count > 0 && !started) {
// 设置标志位为true
started = true
// 发射START指令
emit(SharingCommand.START)
}
}
}
override fun toString(): String = "SharingStarted.Lazily"
}
Eagerly策略只有当订阅者数量大于0的时候,才会向下游发射START指令,并且只会发射一次。
4.WhileSubscribed策略的实现
WhileSubscribed策略默认情况下表示当第一个订阅者出现时启动热流,并在最后一个订阅者消失时终止,保留replayCache中的数据,由StartedWhileSubscribed类实现,代码如下:
private class StartedWhileSubscribed(
private val stopTimeout: Long,
private val replayExpiration: Long
) : SharingStarted {
...
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
// 监听订阅者变化,并对上游发射的数据进行转换
subscriptionCount.transformLatest { count ->
// 如果订阅者数量大于0
if (count > 0) {
// 发射START指令
emit(SharingCommand.START)
} else { // 如果订阅者数量等于0
// 延迟指定的热流终止时间
delay(stopTimeout)
// 如果指定的清除缓存时间大于0
if (replayExpiration > 0) {
// 发射STOP指令
emit(SharingCommand.STOP)
// 延迟指定的清除缓存时间
delay(replayExpiration)
}
// 发射STOP_AND_RESET_REPLAY_CACHE指令
emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
}
} // 只有当START指令发射后,才会向下游发射
.dropWhile { it != SharingCommand.START }
.distinctUntilChanged()// 只有当前后指令不同时,才会向下游发射
...
}
WhileSubscribed策略在订阅者数量大于0的时候向下游发射START指令,在订阅者数量等于0的时候根据不同的延迟时间参数向下游发射STOP指令和STOP_AND_RESET_REPLAY_CACHE指令。并且必须先发射START指令,相邻重复的指令也不会被发射到下游。
三.shareIn方法的实现
代码如下:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
// 计算热流的参数
val config = configureSharing(replay)
// 创建一个类型为MutableSharedFlow的对象
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
// 在指定的协程作用域内启动热流所在的协程
@Suppress("UNCHECKED_CAST")
scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
// 返回SharedFlow类型的对象,控制单一数据发射源
return shared.asSharedFlow()
}
在shareIn方法中,首先调用configureSharing方法,得到热流的基本参数,这些参数会被封装成一个类型为SharingConfig的对象,代码如下:
private class SharingConfig<T>(
// 上游的流
@JvmField val upstream: Flow<T>,
// extraBufferCapacity参数
@JvmField val extraBufferCapacity: Int,
// 溢出策略
@JvmField val onBufferOverflow: BufferOverflow,
// 协程上下文
@JvmField val context: CoroutineContext
)
接下来,会根据计算出的参数,创建一个MutableSharedFlow类型的对象,并调用launchSharing方法启动热流所在的协程,最后返回一个SharedFlow类型的对象。
1.热流参数的计算
configureSharing方法是Flow的扩展方法,根据不同的策略计算出热流的参数,代码如下:
// Flow的扩展方法
private fun <T> Flow<T>.configureSharing(replay: Int): SharingConfig<T> {
assert { replay >= 0 }
// 计算的extraBufferCapacity默认值
// 结果为0与(Channel.CHANNEL_DEFAULT_CAPACITY - replay)中较大的一个
val defaultExtraCapacity = replay.coerceAtLeast(Channel.CHANNEL_DEFAULT_CAPACITY) - replay
// 如果上游流是通道流,则与上游的通道流融合
if (this is ChannelFlow) {
// 获取上游流的上游,检查上游流是否可以在不依赖Channel的情况下执行
val upstream = dropChannelOperators()
// 如果不为空,说明可以不依赖Channel
if (upstream != null) {
// 返回一个SharingConfig类型的对象
return SharingConfig(
// 上游流的上游
upstream = upstream,
// 计算extraBufferCapacity,根据通道流的容量进行判断
extraBufferCapacity = when (capacity) {
// 如果容量为默认值或0
Channel.OPTIONAL_CHANNEL, Channel.BUFFERED, 0 ->
when {
// 如果溢出策略为挂起
onBufferOverflow == BufferOverflow.SUSPEND ->
// 如果容量为0,则返回0,否则返回extraBufferCapacity的默认值
if (capacity == 0) 0 else defaultExtraCapacity
// 如果replayCache的最大容量为0,同时溢出策略又不是挂起,
// 说明至少需要一个缓存,返回1
replay == 0 -> 1
// 走到这里说明replayCache存在,且不需要挂起,返回0
else -> 0
}
// 容量为其他情况,返回通道流的容量
else -> capacity
},
// 通道流的溢出策略
onBufferOverflow = onBufferOverflow,
// 通道流的上下文
context = context
)
}
}
// 如果上游不为通道流,会走到这里
return SharingConfig(
// 上游的流
upstream = this,
// 默认的extraBufferCapacity
extraBufferCapacity = defaultExtraCapacity,
// 默认溢出策略为挂起
onBufferOverflow = BufferOverflow.SUSPEND,
// 空上下文
context = EmptyCoroutineContext
)
}
2.热流协程的启动
launchSharing方法是CoroutineScope的扩展方法,用于启动热流所在的协程,代码如下:
private fun <T> CoroutineScope.launchSharing(
context: CoroutineContext,
upstream: Flow<T>,
shared: MutableSharedFlow<T>,
started: SharingStarted,
initialValue: T
) {
// 根据指定的上下文启动一个新的协程
launch(context) {
// 根据热流启动终止策略进行判断
when {
// Eagerly策略
started === SharingStarted.Eagerly -> {
// 触发上游的执行,并将热流作为一个FlowCollector类型的对象
upstream.collect(shared)
}
// Lazily策略
started === SharingStarted.Lazily -> {
// 监听订阅者数量
// first用于返回上游发射的第一个满足条件的数据,即订阅者数量大于0
// 由于subscriptionCount为热流,因此在没有新数据时,会挂起当前协程
shared.subscriptionCount.first { it > 0 }
// 走到这里,说明订阅者数量大于0
// 触发上游的执行,并将热流作为一个FlowCollector类型的对象
upstream.collect(shared)
}
// WhileSubscribed策略或者自定义策略
else -> {
// 调用command方法获取指令
started.command(shared.subscriptionCount)
// 只有当前后指令发生变化时才会发射给下游
.distinctUntilChanged()
// 触发上游流的执行
.collectLatest {
// 根据热流控制指令进行判断
when (it) {
// 如果为启动指令,则触发上游的执行,并将热流作为一个FlowCollector类型的对象
SharingCommand.START -> upstream.collect(shared)
// 如果为终止指令,什么都不做
SharingCommand.STOP -> { }
// 如果为终止并清空replayCache指令
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
// 如果热流的类型为SharedFlow,即当前方法在shareIn方法中调用
if (initialValue === NO_VALUE) {
// 调用resetReplayCache方法,清除replayCache
shared.resetReplayCache()
} else { // 如果热流的类型为StateFlow,即当前方法在stateIn方法中调用
// 设置状态为初始值
shared.tryEmit(initialValue)
}
}
}
}
}
}
}
}