动态合并 Akka 流

2023-12-31

我尝试使用 Akka 流通过以下方式构建 pub sub 总线:

发布者添加主题的源流,订阅者指定主题并获取该主题的所有内容。然而,主题可以由多个发布者发布,发布者和订阅者都可以随时加入。

我的想法是合并所有源,然后将过滤后的源返回给订阅者。

然而,由于发布者可以在任何点加入,因此在订阅之后可能会添加源,并且订阅者需要像该主题的任何其他已发布数据一样从中获取数据。

有没有一种方法可以动态管理流到源的合并,以便满足以下条件:

publish(topic: String, messages: Source[T])
subscribe(topic: String): Source[T]

这样,无论何时添加发布者,主题的订阅者都将在订阅后获取发布到与该主题相关的任何源的所有消息。

也很高兴听到替代方法。

谢谢, Z


你可能想看看这个 Akka 文档:构建一个dynamic pub-sub service https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#combining-dynamic-operators-to-build-a-simple-publish-subscribe-service using MergeHub https://doc.akka.io/api/akka/2.5/akka/stream/scaladsl/MergeHub%24.html and BroadcastHub https://doc.akka.io/api/akka/2.5/akka/stream/scaladsl/BroadcastHub%24.html.

这是使用的示例代码MergeHub https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#using-the-mergehub and a BroadcastHub https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#using-the-broadcasthub分别作为动态扇入和扇出结点。

这个想法是连接一个MergeHub with a BroadcastHub以 Flow via 的形式形成一个 pub-sub 通道Flow.fromSinkAndSource https://doc.akka.io/docs/akka/2.5/stream/operators/Flow/fromSinkAndSource.html:

val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
  toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
  run

val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)

注意Keep.both上面的代码片段生成一个物化值的元组(Sink[T, NotUsed], Source[T, NotUsed]) from MergeHub.source[T] and BroadcastHub.sink[T]其具有以下方法签名:

object MergeHub {
  def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
  // ...
}

object BroadcastHub {
  def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
  // ...
}

下面是一个简单的发布-订阅通道的示例代码busFlow(类似于 Akka 文档中的示例):

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.NotUsed

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
  toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
  run

// Optional: avoid building up backpressure when there is no subscribers
bfSource.runWith(Sink.ignore)

val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)

Testing busFlow:

Source(101 to 103).map(i => s"Batch(A)-$i").
  delay(2.seconds, DelayOverflowStrategy.backpressure).
  viaMat(busFlow)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
  run

Source(104 to 105).map(i => s"Batch(B)-$i").
  viaMat(busFlow)(Keep.right).
  to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
  run

// Consumer(2)-Batch(B)-104
// Consumer(2)-Batch(B)-105
// Consumer(1)-Batch(B)-104
// Consumer(1)-Batch(B)-105
// Consumer(1)-Batch(A)-101
// Consumer(1)-Batch(A)-102
// Consumer(2)-Batch(A)-101
// Consumer(1)-Batch(A)-103
// Consumer(2)-Batch(A)-102
// Consumer(2)-Batch(A)-103

作为一个 pub-sub 通道,输入busFlow发布于bfSink发送给所有订阅者,同时其输出流通过bfSource已发布的所有元素。例如:

val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
p1.runWith(bfSink)

val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
p2.runWith(bfSink)

val s1 = bfSource
s1.runForeach(x => println(s"s1 --> $x"))

val s2 = bfSource
s2.runForeach(x => println(s"s2 --> $x"))

// s1 --> 5
// s2 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// s2 --> 5
// s1 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// ...

其他可能感兴趣的相关主题包括KillSwitch https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#controlling-stream-completion-with-killswitch用于流完成控制和PartitionHub https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#using-the-partitionhub用于将 Stream 元素从给定的生产者路由到一组动态的消费者。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

动态合并 Akka 流 的相关文章

  • 为什么调用 take() 方法时 Slick 会生成子查询

    I use Slick http slick typesafe com 1 0 0 RC1 我对表对象有这样的定义 object ProductTable extends Table Int String String String Dou
  • 在 C 中读取字符时打印

    我正在尝试编写一个简单的小代码片段来响应箭头键按下 我知道 up 由 A 表示 并且我有以下代码来检查该序列 while 1 input char fgetc stdin if input char EOF input char n bre
  • Liftweb 环境中的后台任务

    我必须编写守护进程 并且我想使用模型来连接到数据库和一些有用的 Lift 类 是否可以运行 Rails 的 rake 任务的模拟 Scala 社区组上也有类似的问题 答案是使用Actors来做后台处理
  • 如何检查字符串中是否包含某个字符?

    我想检查字符串是否包含该字符 我正在编写一个刽子手代码 例如 下面是要猜测的单词 scala 但看起来像 至用户 假设用户输入字母 a 那么它一定看起来像 a a def checkGuess if result contains user
  • 了解如何使用 apply 和 unappy

    我试图更好地理解 的正确用法apply and unapply方法 考虑到我们想要序列化和反序列化的对象 这是正确的用法吗 即斯卡拉方式 的使用apply and unapply case class Foo object Foo appl
  • Scala:如何编写将类型化为接收者的实现类型的对象返回的方法

    我知道 Scala 中不推荐使用案例类继承 但为了简单起见 我在以下示例中使用了它 scala gt case class Foo val f String def foo g String Foo this copy f g define
  • 对于空列表,max() 应该返回什么?

    Got java util NoSuchElementException head of empty list所以我试着检查一下 但现在我明白了 info max of a few numbers FAILED info 0 did not
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • HttpRequest PUT内容到poco库中

    我想使用 HTTP PUT 请求将一些数据从 C 应用程序发送到服务器 我在用poco http pocoproject org我的应用程序中的网络库 我正在使用这个代码片段 HTTPClientSession session uri ge
  • Scala:具有复杂结构的树插入尾递归

    我正在 scala 中创建自定义对象树 并且我的插入方法引发堆栈溢出 因为它不是尾递归 但是 我不太清楚如何使其尾递归 我见过使用 累加器 变量的相关示例 但它们要么是只能相乘和覆盖的整数之类的东西 要么是我在适应树时遇到困难的列表 这是我
  • 了解 Scala 中的中缀方法调用和缺点运算符(::)

    我对 Scala 编程语言相当陌生 当我遵循以下网站的讲义时 我正在尝试一些萦绕在我脑海中的东西 here http horstmann com sjsu cs152 04 closures1 html 我想我无法真正理解 cons 运算符
  • 有没有办法捕获 Spark 中使用通配符读取的多个 parquet 文件的输入文件名?

    我使用 Spark 将多个 parquet 文件读取到单个 RDD 中 并使用标准通配符路径约定 换句话说 我正在做这样的事情 val myRdd spark read parquet s3 my bucket my folder parq
  • 如何使用 Akka BoundedMailBox 来限制生产者

    我有两个演员 一个正在生成消息 另一个正在按某个时间消费消息固定利率 是否有可能让生产者受到消费者 BoundedMailBox 的限制 背压 我的生产者当前是定期计划的 向其发送一条勾选消息 有没有办法根据消费者邮箱中的可用性来计划它 我
  • 在 Python 中使用音频流 RTMP 通过管道和 OpenCV 到 FFmpeg

    我正在尝试使用音频流式传输 FFmpeg 我将在下面展示我的代码 导入模块 import subprocess as sp 创建变量 rtmpUrl rtmp a rtmp youtube com live2 key camera path
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala
  • Scala 和变量中的模式匹配

    我是 Scala 新手 有点想知道模式匹配是如何工作的 想象一下我有以下内容 case class Cls i Int case b Cls i gt Ok case e Cls gt Ok case f Cls gt Ok case s
  • Scala:如何在超类上实现克隆方法,并在子类中使用它?

    我可能会以错误的方式处理这个问题 但我想要一个像这样的对象 class MyDataStructure def myClone val clone new MyDataStructure do stuff to make clone the
  • 合并 2 个大型 CSS 文件的有效方法

    我正在寻找一个可以合并 2 个大型 CSS 文件的工具 到目前为止我尝试过的所有方法 例如CSSMerge 都没有成功 其中一些只是随机删除属性 其他人则因 webkit 和 moz 等非标准属性而窒息 并给我错误 我还需要保留每条规则大小
  • 对 Scala Not Null 特征的库支持

    Notice 从 Scala 2 11 开始 NotNull已弃用 据我了解 如果您希望引用类型不可为空 则必须混合魔法NotNull特征 编译器会自动阻止你输入null 可以值在里面 看到这个邮件列表线程 http www nabble

随机推荐