Akka 持久化查询事件流和 CQRS

2023-11-26

我正在尝试在我的 ES-CQRS 架构中实现读取端。假设我有一个这样执着的演员:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

据我了解,每次事件持续存在时,我们都可以通过以下方式发布它Akka Persistence Query。现在,我不确定订阅这些事件的正确方法是什么,以便我可以将其保留在我的读取端数据库中?其中一个想法是首先发送一个UsersStream我的读方演员发来的消息UserWrite演员和“沉没”事件中读取演员。

EDIT

根据@cmbaxter的建议,我以这种方式实现了读取端:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

存在一些问题,例如: 事件流似乎很慢。 IE。UserRead参与者可以在保存新添加的用户之前回答一组用户。

EDIT 2

我增加了 cassandra 查询日志的刷新间隔,这更多地解决了事件流缓慢的问题。看来 Cassandra 事件日志默认每 3 秒轮询一次。在我的application.conf我补充道:

cassandra-query-journal {
  refresh-interval = 20ms
}

EDIT 3

实际上,不要减少刷新间隔。这会增加内存使用量,但这并不危险,也没有什么意义。 CQRS 的一般概念是写入和读取端是异步的。因此,写入数据后永远无法立即读取。处理用户界面?我只是在读取方确认数据后打开流并通过服务器发送的事件推送数据。


有一些方法可以做到这一点。例如,在我的应用程序中,我的查询端有一个参与者,它有一个持续查找更改的 PersistenceQuery,但您也可以有一个具有相同查询的线程。问题是保持流打开,以便能够在持久事件发生时立即读取它

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

相反,您可以使用一个计时器来引发 PersistenceQuery 并存储新事件,但我认为打开流是最好的方法

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Akka 持久化查询事件流和 CQRS 的相关文章

  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 缓存 Slick DBIO 操作

    我正在尝试加快 SELECT FROM WHERE name 的速度Play 中的查询类型 Scala 应用程序 我正在使用 Play 2 4 Scala 2 11 play slick 1 1 1 包 该软件包使用Slick 3 1版本
  • 运行具有外部依赖项的 Scala 脚本

    我在 Users joe scala lib 下有以下 jar commons codec 1 4 jar httpclient 4 1 1 jar httpcore 4 1 jar commons logging 1 1 1 jar ht
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将
  • 如何在 apache Spark 作业中执行阻塞 IO?

    如果当我遍历 RDD 时 我需要通过调用外部 阻塞 服务来计算数据集中的值怎么办 您认为如何才能实现这一目标 值 Future RDD Double Future sequence tasks 我尝试创建一个 Futures 列表 但由于
  • 将 yaml 中的列表映射到 Scala 中的对象列表(Spring Boot)

    背景 我已经阅读了很多关于如何使用的示例ConfigurationProperties从配置中读取列表 见下文 https github com konrad garus so yaml https github com konrad ga
  • 如何使用 log4j 自定义附加程序在 HDFS 上创建日志?

    Overview 我们希望使用 log4j 记录 Spark 作业活动 并将日志文件写入 HDFS Java 8 Spark 2 4 6 Scala 2 1 2 Hadoop 3 2 1 我们无法找到本地 apache log4j 附加程序
  • Akka Streams / HTTP:从响应中获取原始请求

    我有一个 Akka Streams 源 它会遍历流程并发布 HTTP 请求 source map toRequest via Http outgoingConnection host map toMessage 假设toRequest方法将
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 如何访问 Scala XML 中的父元素

    The scala xml包表示带有标记树节点的 XML 但是这棵树在 Scala 2 7 中是单向的吗 因为似乎没有办法访问Elem给定的父级Elem 这似乎同样适用于父母Document 例如 在 XOM 中你有getParent an
  • Spark 数据帧:根据另一列的值提取一列

    我有一个包含带有连接价目表的交易的数据框 paid currency EUR USD GBP 49 5 EUR 99 79 69 客户已支付 49 5 欧元 如 货币 列中所示 我现在想将支付的价格与价目表中的价格进行比较 因此 我需要根据
  • 是否有适用于 Haskell 或 Scala 等函数式语言的 LL 解析器生成器?

    我注意到明显缺乏用函数式语言创建解析器的 LL 解析器 我一直在寻找但没有成功的理想发现是为 ANTLR 风格的 LL 语法生成 Haskell 解析器 语法的模小数重新格式化 并且令我惊讶的是 每个最后一个解析器生成器都具有函数我发现的语
  • 如何在Slick 3.0.0中使用StaticQuery?

    在 Slick 2 1 中 我使用以下代码从文件执行 sql 查询 def fetchResult T sql String implicit getResult GetResult T List T val query Q queryNA
  • 如何将函数应用于元组?

    这应该是一件容易的事 如何将函数应用于 Scala 中的元组 即 scala gt def f i Int j Int i j f Int Int Int scala gt val p 3 4 p Int Int 3 4 scala gt
  • Spark 3 KryoSerializer 问题 - 无法找到类:org.apache.spark.util.collection.OpenHashMap

    我正在将 Spark 2 4 项目升级到 Spark 3 x 我们遇到了一些现有 Spark ml 代码的问题 var stringIndexers Array StringIndexer for featureColumn lt FEAT
  • 在案例类中重载 unapply 方法:scala

    考虑下面的代码 case class User id Int name String object User def unapply str String Some User 0 str Scala 抱怨 错误 无法解析重载未应用 案例类
  • 根据 Slick 中的 Id 选择单行

    我想根据 Id 查询用户的一行 我有以下虚拟代码 case class User id Option Int name String object Users extends Table User user def id column In
  • 为什么 Scala 选项的 foreach 比 get 更好?

    为什么使用foreach map flatMap等被认为比使用更好get对于 Scala 选项 如果我使用isEmpty我可以打电话get安全 好吧 这又回到了 告诉 不要问 考虑这两行 if opt isDefined println o
  • 使用空/空字段值创建新的数据框

    我正在从现有数据帧创建一个新数据帧 但需要在这个新 DF 中添加新列 下面代码中的 field1 我该怎么做 工作示例代码示例将不胜感激 val edwDf omniDataFrame withColumn field1 callUDF v
  • Scala API 2.10.*:Function2.and然后发生了什么?

    我正在阅读 Joshua Suereth 所著的 Scala in Depth 我购买这本书是为了了解作者的明确能力 我在第 3 页上 在出现一堆拼写错误和不连贯的格式之后 好吧 我已经开始容忍这些错误 我偶然发现了以下示例 该示例涉及解决

随机推荐