我有一个非常简单的例子,我有一个演员(SimpleActor
)通过向自身发送消息来执行周期性任务。该消息在参与者的构造函数中安排。在正常情况下(即没有故障)一切正常。
但如果 Actor 必须处理错误怎么办?我还有另一个演员(SimpleActorWithFault
)。这个演员可能有缺点。在本例中,我通过抛出异常来自己生成一个异常。当故障发生时(即SimpleActorWithFault
抛出异常)它会自动重新启动。然而,这种重新启动会扰乱 Actor 内部的调度程序,使其不再按异常方式运行。如果故障发生得足够快,就会产生更多意想不到的行为。
我的问题是在这种情况下处理故障的首选方法是什么?我知道我可以使用Try
块来处理异常。但是,如果我要扩展另一个演员,而我无法将 Try 放入超类中,或者在演员中发生异常错误的情况下,该怎么办?
import akka.actor.{Props, ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor
case object MessageA
case object MessageToSelf
class SimpleActor extends Actor with ActorLogging {
//schedule a message to self every second
context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)
//keeps track of some internal state
var count: Int = 0
def receive: Receive = {
case MessageA => {
log.info("[SimpleActor] Got MessageA at %d".format(count))
}
case MessageToSelf => {
//update state and tell the world about its current state
count = count + 1
log.info("[SimpleActor] Got scheduled message at %d".format(count))
}
}
}
class SimpleActorWithFault extends Actor with ActorLogging {
//schedule a message to self every second
context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)
var count: Int = 0
def receive: Receive = {
case MessageA => {
log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
}
case MessageToSelf => {
count = count + 1
log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))
//at some point generate a fault
if (count > 5) {
log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
throw new Exception("Excepttttttiooooooon")
}
}
}
}
object MainApp extends App {
implicit val akkaSystem = akka.actor.ActorSystem()
//Run the Actor without any faults or exceptions
akkaSystem.actorOf(Props(classOf[SimpleActor]))
//comment the above line and uncomment the following to run the actor with faults
//akkaSystem.actorOf(Props(classOf[SimpleActorWithFault]))
}
正确的做法是将危险行为下放到自己的演员身上。这种模式称为错误内核模式(请参阅 Akka 并发,第 8.5 节):
该模式描述了一种非常常识性的监督方法
根据任何不稳定因素将参与者彼此区分开来
表明他们可能持有。
简而言之,就是状态珍贵的演员不应该
允许失败或重新启动。任何持有宝贵数据的参与者都是
受到保护,任何有风险的操作都会被降级为从属操作
演员,如果重新开始,只会导致好事发生。
错误内核模式意味着进一步降低风险水平
那个树。
另请参阅另一个教程在这里 http://danielwestheide.com/blog/2013/03/20/the-neophytes-guide-to-scala-part-15-dealing-with-failure-in-actor-systems.html.
所以在你的情况下它会是这样的:
SimpleActor
|- ActorWithFault
Here SimpleActor
充当导师 for ActorWithFault
。默认的监督策略any演员要重新启动一个孩子Exception
并升级其他任何事情:http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html
升级意味着参与者本身可能会重新启动。既然你真的不想重新开始SimpleActor
你可以让它总是重新启动ActorWithFault
并且永远不要通过覆盖主管策略来升级:
class SimpleActor {
override def preStart(){
// our faulty actor --- we will supervise it from now on
context.actorOf(Props[ActorWithFault], "FaultyActor")
...
override val supervisorStrategy = OneForOneStrategy () {
case _: ActorKilledException => Escalate
case _: ActorInitializationException => Escalate
case _ => Restart // keep restarting faulty actor
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)