跳转到内容

Kotlin流(Flow)

来自代码酷

Kotlin流(Flow)[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Kotlin流(Flow)是Kotlin协程库中用于处理异步数据流的API,它以响应式编程的方式按顺序发射多个值。Flow的设计灵感来源于Reactive Streams(如RxJava),但完全基于协程实现,提供更轻量级且符合Kotlin习惯的解决方案。

核心特点:

  • 冷流(Cold Stream):只有在收集(collect)时才会执行发射逻辑
  • 可取消支持:与协程生命周期绑定
  • 背压(Backpressure)处理:通过挂起函数天然支持
  • 操作符丰富:提供类似集合的转换操作(map、filter等)

基础使用[编辑 | 编辑源代码]

最简单的Flow通过`flow`构建器创建:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 模拟异步操作
        emit(i)    // 发射值
    }
}

fun main() = runBlocking {
    simpleFlow().collect { value -> 
        println(value) 
    }
}

输出:

1
2
3

关键组件[编辑 | 编辑源代码]

graph LR A[生产者] -->|emit| B(Flow) B -->|collect| C[消费者] C -->|处理| D[结果]

  • 生产者:通过`emit()`发射数据
  • Flow:数据传输管道
  • 消费者:通过`collect()`接收数据

操作符[编辑 | 编辑源代码]

Flow提供两类操作符:

中间操作符[编辑 | 编辑源代码]

不立即执行,返回新Flow:

flowOf(1,2,3)
    .map { it * it }       // 转换
    .filter { it % 2 != 0 }// 过滤
    .collect(::println)    // 输出:1, 9

末端操作符[编辑 | 编辑源代码]

触发流执行:

  • `collect()`:收集所有值
  • `first()`:获取第一个值
  • `toList()`:转换为集合
  • `reduce()`:累积计算

异常处理[编辑 | 编辑源代码]

使用`catch`操作符捕获异常:

flow {
    emit(1)
    throw RuntimeException("Error!")
}.catch { e -> 
    println("Caught: $e") 
}.collect(::println)

上下文控制[编辑 | 编辑源代码]

默认遵循上下文保留(Context Preservation)原则,可通过`flowOn`切换上下文:

flow {
    println("Emitting in ${Thread.currentThread().name}")
    emit(1)
}.flowOn(Dispatchers.IO)
 .collect { 
    println("Collecting in ${Thread.currentThread().name}") 
}

高级特性[编辑 | 编辑源代码]

状态流(StateFlow)[编辑 | 编辑源代码]

热流实现,自动保存最新状态:

val state = MutableStateFlow(0)

fun update() = runBlocking {
    state.collect { println("State: $it") }
    state.value = 1  // 触发更新
}

共享流(SharedFlow)[编辑 | 编辑源代码]

支持多订阅者的热流:

val sharedFlow = MutableSharedFlow<Int>()

fun producer() = CoroutineScope(Dispatchers.Default).launch {
    repeat(5) {
        sharedFlow.emit(it)
        delay(100)
    }
}

实际案例[编辑 | 编辑源代码]

场景:实现实时搜索建议

fun searchFlow(query: String): Flow<List<Result>> = flow {
    val api = SearchApi()
    emit(api.suggest(query)) // 首次快速结果
    delay(300) // 防抖
    emit(api.fullSearch(query)) // 完整结果
}

// UI层使用
viewModelScope.launch {
    searchQuery.asFlow()
        .debounce(300)
        .flatMapLatest { query ->
            searchFlow(query)
        }
        .collect { results ->
            updateUI(results)
        }
}

性能优化[编辑 | 编辑源代码]

  • 使用`buffer()`并行处理生产-消费
  • 对快速发射流使用`conflate()`跳过中间值
  • 复杂计算使用`flowOn`切换调度器

数学表示[编辑 | 编辑源代码]

Flow可视为离散时间序列: F={x1,x2,...,xn|xiX} 其中转换操作符相当于函数组合: map(f)filter(p){f(xi)|p(xi)=true}

常见问题[编辑 | 编辑源代码]

Q: Flow与LiveData的区别?

  • Flow更灵活,支持复杂异步操作
  • LiveData专为UI设计,自动感知生命周期

Q: 何时该用Channel替代Flow?

  • 需要跨协程通信时
  • 处理一次性事件(非数据流)

最佳实践[编辑 | 编辑源代码]

1. 避免在Flow中修改共享状态 2. 对资源使用`onCompletion`进行清理 3. 公共API返回`Flow`而非具体实现类 4. 测试时使用`test`模块的`runTest`