我有一个 Actor,它接收指标数据点并定期聚合并将它们保存到磁盘。后一个操作执行 I/O,因此我不想使用阻塞操作。但是,如果我将其切换为异步,如何防止在聚合完成之前接收其他数据点而不阻塞某处。
我见过的一种模式是使用Stash
,像这样:
class Aggregator extends Actor with Stash {
def receive = processing
def processing: Receive = {
case "aggregate" => {
context.become(aggregating)
aggregate().onComplete {
case Success => self ! "aggregated"
case Failure => self ! "aggregated"
}
}
case msg => ??? // Process task
}
def aggregating: Receive = {
case "aggregated" =>
unstashAll()
context.become(processing)
case msg =>
stash()
}
}
我对此的疑虑是,我的总体行动的完成只是任何人都可以发送的消息。据我了解,我无法在我的未来完成过程中影响“不恰当”。
作为旁注,我无法确定是否完成了类似onComplete
以某种方式由同一个调度程序执行receive
,因为如果不是,完成将破坏参与者提供的单线程保护。
或者是否有更好的模式来完成内部不同步和立即的操作receive
同时保证我的状态在我完成之前不能改变?似乎这种情况任何时候 actor 状态都会处理任何类型的 I/O(例如数据库),并且显然您希望尽可能避免同步 I/O。
您的聚合参与者当前正在做两件事:聚合和存储。您可以通过拆分这两项任务来解决您的问题并简化您的系统。这单一责任原则 http://en.wikipedia.org/wiki/Single_responsibility_principle也适用于演员。
I'd create a dedicated actor for writing and a message class for holding the aggregated data. This actor sub-system should look like this:
理想情况下,写入磁盘所需的时间短于聚合间隔,以便您的系统保持稳定。如果出现峰值,DataStore actor 的队列将充当缓冲区,用于将消息写入存储。
根据您的应用程序,您可能需要实现某种形式的确认和重试,以确保聚合数据已写入。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)