你可能想看看这个 Akka 文档:构建一个dynamic pub-sub service https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#combining-dynamic-operators-to-build-a-simple-publish-subscribe-service using MergeHub https://doc.akka.io/api/akka/2.5/akka/stream/scaladsl/MergeHub%24.html and BroadcastHub https://doc.akka.io/api/akka/2.5/akka/stream/scaladsl/BroadcastHub%24.html.
这是使用的示例代码MergeHub https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#using-the-mergehub and a BroadcastHub https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#using-the-broadcasthub分别作为动态扇入和扇出结点。
这个想法是连接一个MergeHub
with a BroadcastHub
以 Flow via 的形式形成一个 pub-sub 通道Flow.fromSinkAndSource https://doc.akka.io/docs/akka/2.5/stream/operators/Flow/fromSinkAndSource.html:
val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize).
toMat(BroadcastHub.sink[String](bufferSize))(Keep.both).
run
val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
注意Keep.both
上面的代码片段生成一个物化值的元组(Sink[T, NotUsed], Source[T, NotUsed])
from MergeHub.source[T]
and BroadcastHub.sink[T]
其具有以下方法签名:
object MergeHub {
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = // ...
// ...
}
object BroadcastHub {
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = // ...
// ...
}
下面是一个简单的发布-订阅通道的示例代码busFlow
(类似于 Akka 文档中的示例):
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.NotUsed
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val (bfSink, bfSource) = MergeHub.source[String](perProducerBufferSize = 32).
toMat(BroadcastHub.sink[String](bufferSize = 256))(Keep.both).
run
// Optional: avoid building up backpressure when there is no subscribers
bfSource.runWith(Sink.ignore)
val busFlow: Flow[String, String, NotUsed] = Flow.fromSinkAndSource(bfSink, bfSource)
Testing busFlow
:
Source(101 to 103).map(i => s"Batch(A)-$i").
delay(2.seconds, DelayOverflowStrategy.backpressure).
viaMat(busFlow)(Keep.right).
to(Sink.foreach{ case s: String => println("Consumer(1)-" + s) }).
run
Source(104 to 105).map(i => s"Batch(B)-$i").
viaMat(busFlow)(Keep.right).
to(Sink.foreach{ case s: String => println("Consumer(2)-" + s) }).
run
// Consumer(2)-Batch(B)-104
// Consumer(2)-Batch(B)-105
// Consumer(1)-Batch(B)-104
// Consumer(1)-Batch(B)-105
// Consumer(1)-Batch(A)-101
// Consumer(1)-Batch(A)-102
// Consumer(2)-Batch(A)-101
// Consumer(1)-Batch(A)-103
// Consumer(2)-Batch(A)-102
// Consumer(2)-Batch(A)-103
作为一个 pub-sub 通道,输入busFlow
发布于bfSink
发送给所有订阅者,同时其输出流通过bfSource
已发布的所有元素。例如:
val p1 = Source.tick[Int](0.seconds, 5.seconds, 5).map(_.toString)
p1.runWith(bfSink)
val p2 = Source.tick[Int](2.seconds, 10.seconds, 10).map(_.toString)
p2.runWith(bfSink)
val s1 = bfSource
s1.runForeach(x => println(s"s1 --> $x"))
val s2 = bfSource
s2.runForeach(x => println(s"s2 --> $x"))
// s1 --> 5
// s2 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// s2 --> 5
// s1 --> 5
// s1 --> 10
// s2 --> 10
// s2 --> 5
// s1 --> 5
// ...
其他可能感兴趣的相关主题包括KillSwitch https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#controlling-stream-completion-with-killswitch用于流完成控制和PartitionHub https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html?language=scala#using-the-partitionhub用于将 Stream 元素从给定的生产者路由到一组动态的消费者。