takeWhileInclusive 的这种实现安全吗?

2023-12-07

我发现以下包容性的实现takeWhile (found here)

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)
        result
    }
}

问题是我不是 100% 相信如果在并行序列.

我担心的是我们会依赖shouldContinue变量来知道何时停止,但我们没有同步它的访问。

有什么见解吗?


这是我到目前为止所想到的。

问题澄清

问题不清楚。没有这样的东西并行序列我可能把它们搞混了Java并行流。我的意思是一个序列同时消耗.

序列是同步的

正如 @LouisWasserman 在评论中指出的那样,序列并不是为并行执行而设计的。特别是SequenceBuilder注释为@RestrictSuspension。引用自Kotlin 协程 repo:

这意味着在其范围内 lambda 的任何 SequenceBuilder 扩展都不能调用 suspendContinuation 或其他通用挂起函数

话虽如此,正如 @MarkoTopolnik 评论的那样,它们仍然可以像任何其他对象一样在并行程序中使用。

并行使用的序列

作为示例,这是并行使用序列的第一次尝试

fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }
    }
}

此代码打印:

[ForkJoinPool.commonPool-worker-2] 处理器 #1 收到 1

[ForkJoinPool.commonPool-worker-1] 处理器 #0 收到 0

[ForkJoinPool.commonPool-worker-3] 处理器 #2 收到 2

[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 3

[ForkJoinPool.commonPool-worker-1] 处理器 #4 收到 3

[ForkJoinPool.commonPool-worker-3] 处理器 #5 收到 3

[ForkJoinPool.commonPool-worker-1] 处理器 #7 收到 5

[ForkJoinPool.commonPool-worker-2] 处理器 #6 收到 4

[ForkJoinPool.commonPool-worker-1] 处理器 #9 收到 7

[ForkJoinPool.commonPool-worker-3] 处理器 #8 收到 6

哪个当然是不是我们想要的。因为有些数字会被消耗两次。

进入频道

另一方面,如果我们要使用通道,我们可以编写如下内容:

fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

那么输出是:

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 1

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 2

[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 3

[ForkJoinPool.commonPool-worker-2] 处理器 #2 收到 4

[ForkJoinPool.commonPool-worker-1] 处理器 #3 收到 5

[ForkJoinPool.commonPool-worker-2] 处理器 #4 收到 6

[ForkJoinPool.commonPool-worker-2] 处理器 #0 收到 7

[ForkJoinPool.commonPool-worker-1] 处理器 #1 收到 8

[ForkJoinPool.commonPool-worker-1] 处理器 #2 收到 9

[ForkJoinPool.commonPool-worker-2] 处理器 #3 收到 10

此外,我们可以实施takeWhileInclusive像这样的频道方法:

fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce
        send(it)
    }
}

它按预期工作。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

takeWhileInclusive 的这种实现安全吗? 的相关文章

  • Kotlin:使用 Picasso 从 flickr 加载图像时出现错误 503

    我的应用程序使用 Android 的 Picasso 库从 flickr 加载图像 奇怪的是 不久前将我的应用程序迁移到 Kotlin 后 它工作得很好 但现在我开始出现 随机 503 错误 我已经在 flickr 控制面板中为每个图像设置
  • Android Jetpack Compose 尺寸随持续时间变化的动画

    如何在 Jetpack Compose 中添加内容大小更改动画的持续时间 尝试使用Modifier animateContentSize 并通过动画规格具有持续时间 但它只是突然进入或退出 没有观察到持续时间 Column Modifier
  • 扩展功能是虚拟的以调度接收者?

    在科特林中将扩展程序声明为成员 https kotlinlang org docs reference extensions html 这是什么意思 此类职能的派遣是virtual关于调度接收器类型 但是static关于扩展接收器类型 这是
  • Kotlin - 来自 KType 的 KClass<*>

    在 Kotlin 中 我可以获得KType from a KClass lt gt 像这样 Int class createType kotlin Int 我如何做相反的事情并获得KClass
  • 如何在jetpack compose中删除文本基线下方的空间?

    目前我得到这个 但我想要这样的东西 而且 50 和 min 中的文本也应该与顶部对齐 My code Row verticalAlignment Alignment Bottom Text text 18 color MaterialThe
  • 使用协程对任务进行排队

    我最近开始阅读有关协程的内容 我想询问某个场景 考虑一个带有一个按钮的简单屏幕 单击后 它会执行一堆打印语句和一些延迟 其间 到目前为止 我正在使用协程来实现这一目标 现在 我的问题是 如果用户反复向该按钮发送垃圾邮件 是否有一种方法可以将
  • Kotlin 中 with 和 run 的区别

    Do with and run具有相同的功能 只是语法不同 或者之间是否存在重大差异with and run 哪个是正确的方法 adapter run notifyDataSetChanged if activityDetails isEm
  • 使用 Google 语音服务查询支持的语言不适用于 Android 13

    我使用以下命令查询语音服务支持的语言RecognizerIntent ACTION GET LANGUAGE DETAILS action val intent Intent RecognizerIntent ACTION GET LANG
  • 如何在 Scala mutable.Seq 上追加或前置

    Scala 有一些我不明白的地方collection mutable Seq http www scala lang org api current index html scala collection mutable Seq 它描述了所
  • 如何在运行时编译和使用Kotlin代码?

    我正在尝试创建一个 KotlinVert x http vertx io vertx2 language support html语言支持模块 我需要一种方法来编译 Kotlin 文件并使用ClassLoader 我尝试过使用kotlin
  • 如何在 Jetpack Compose 中提供相对大小

    我有一个框布局 我想相对于父框的大小来布局子视图 这可以在 SwiftUI 中使用 Geometry Reader 来实现 如何在 Jetpack Compose 中实现类似的功能 您可以使用BoxWithConstraints代替Box
  • Kotlin 的合成属性冲突

    我是科特林新手 除了其他非常有趣的事情之外 我还发现了 Android 扩展 根据文档 https kotlinlang org docs tutorials android plugin html importing synthetic
  • Kotlin 合约不适用于扩展函数中的空检查

    我正在尝试编写一个返回的扩展函数true如果该值不是null或 0 并使用合约向编译器保证如果我返回true 该值非空 但是 它似乎不适用于智能投射 当我尝试将值传递给采用不可空值的函数时 它仍然无法编译Long 我尝试编译这段代码 但它不
  • 如何在 Kotlin 中正确处理大于 127 的 Byte 值?

    假设我有一个带有变量的 Kotlin 程序b类型的Byte 外部系统向其中写入大于的值127 外部 意味着我无法更改它返回的值的类型 val a Int 128 val b Byte a toByte Both a toByte and b
  • Spring Mongo Populator 一一

    我在 Kotlin 上使用 MongoDB 和 Spring 并且希望我的应用程序在启动时填充 MongoDB 集合 并在每次启动时清理 我的问题是 如果我填充的某些数据有问题 如何才能一一填充数据以便容错 my code Configur
  • Hilt:java.lang.ClassNotFoundException:找不到类“com.kotlin20test.Hilt_MyApp”

    我有一个错误关于Hilt 我一直在尝试注入我使用创建的改造界面Hilt 这是错误 java lang ClassNotFoundException Didn t find class com kotlin20test Hilt MyApp
  • 到底什么是序列?

    蟒蛇docs https docs python org 3 glossary html term sequence有点模棱两可 sequence 一个可迭代对象 支持通过以下方式使用整数索引进行有效的元素访问 getitem 特殊方法并定
  • 为什么 CheckBox 检查不能以编程方式与 Kotlin 一起使用?

    我想这个问题以前可能有人问过 但这个问题也发生在我身上 所以我在这里再次询问 看看我们能否找到解决方案 所以基本上问题是以编程方式检查复选框不与 Kotlin 代码一起工作 为了解释一下 我正在分享我的代码和问题的屏幕截图 filterCo
  • 使用 Ktor 进行部署:如何设置 AppEngine 版本?

    Issue 我遵循了这个清晰简洁的Ktor教程 https cloud google com community tutorials kotlin ktor app engine java8 现在我已经成功部署到AppEngine我想手动设
  • 有什么区别!!和 ?在科特林?

    我是科特林新手 我想知道这两者之间的区别 and 在下面的代码中 下面有两个片段 第一个使用 for mCurrentDataset另一个有 对于同一个变量 if mCurrentDataset load mDataSetString ge

随机推荐