takeWhileInclusive 的这种实现安全吗?


我发现以下包容性的实现takeWhile (found here)

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)

问题是我不是 100% 相信如果在并行序列.







正如 @LouisWasserman 在评论中指出的那样,序列并不是为并行执行而设计的。特别是SequenceBuilder注释为@RestrictSuspension。引用自Kotlin 协程 repo:

这意味着在其范围内 lambda 的任何 SequenceBuilder 扩展都不能调用 suspendContinuation 或其他通用挂起函数

话虽如此,正如 @MarkoTopolnik 评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。



fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }


[ForkJoinPool.commonPool-worker-2] 处理器 #1 收到 1

[ForkJoinPool.commonPool-worker-1] 处理器 #0 收到 0

[ForkJoinPool.commonPool-worker-3] 处理器 #2 收到 2

[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 3

[ForkJoinPool.commonPool-worker-1] 处理器 #4 收到 3

[ForkJoinPool.commonPool-worker-3] 处理器 #5 收到 3

[ForkJoinPool.commonPool-worker-1] 处理器 #7 收到 5

[ForkJoinPool.commonPool-worker-2] 处理器 #6 收到 4

[ForkJoinPool.commonPool-worker-1] 处理器 #9 收到 7

[ForkJoinPool.commonPool-worker-3] 处理器 #8 收到 6




fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    producer.cancel() // cancel producer coroutine and thus kill them all


[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 1

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 2

[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 3

[ForkJoinPool.commonPool-worker-2] 处理器 #2 收到 4

[ForkJoinPool.commonPool-worker-1] 处理器 #3 收到 5

[ForkJoinPool.commonPool-worker-2] 处理器 #4 收到 6

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 7

[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 8

[ForkJoinPool.commonPool-worker-1] 处理器 #2 收到 9

[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 10


fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce



