我使用 akka 的 actor 模型构建了一个分布式流机器学习模型。通过向 Actor 发送训练实例(训练数据)来异步训练模型。对这些数据的训练会占用计算时间并改变参与者的状态。
目前我正在使用历史数据来训练模型。我想运行一堆不同配置的模型,这些模型在相同的数据上进行训练,并查看不同的集成指标有何变化。本质上,这是对 Thread.sleep(1) 和表示计算时间和状态的数据数组进行的操作的简单得多的模拟。
implicit val as = ActorSystem()
case object Report
case class Model(dataSize: Int) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Thread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val trainingData = Array.fill(5000)(Random.nextInt)
val dataSizeParams = (1 to 500)
接下来我使用 for 循环来改变参数(由 dataSizeParams 数组表示)
for {
param <- dataSizeParams
} {
// make model with params
val model = Model(param)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
}
for 循环绝对是我想做的事情的错误方式。它并行启动所有不同的模型。当 dataSizeParams 在 1 到 500 范围内时它效果很好,但是如果我将其提高到较高的值,我的模型每个都会开始占用明显的内存块。我想出的是下面的代码。本质上,我有一个模型大师,他可以根据他收到的运行消息的数量来控制同时运行的模型数量。现在,每个模型都包含对此主参与者的引用,并在处理完成后向他发送一条消息:
// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run
case class Model(dataSize: Int, master: ActorRef) {
val modelActor: ActorRef = actor(new Act {
val data = Array.fill(dataSize)(0)
become {
case trainingData: Int => {
// Screw with the state of the actor and pretend that it takes time
Tread.sleep(1)
data(Math.abs(Random.nextInt % dataSize)) == trainingData
}
case Report => {
println(s"Finished $dataSize")
master ! ImDone
context.stop(self)
}
}
})
def train(trainingInstance: Int) = modelActor ! trainingInstance
def report: Unit = modelActor ! Report
}
val master: ActorRef = actor(new Act {
var paramRuns = dataSizeParams.toIterator
become {
case Run => {
if (paramRuns.hasNext) {
val model = Model(paramRuns.next(), self)
for {
trainingInstance <- trainingData
} {
model.train(trainingInstance)
}
model.report
} else {
println("No more to run")
context.stop(self)
}
}
case ImDone => {
self ! Run
}
}
})
master ! Run
主代码没有任何问题(我可以看到)。我可以严格控制一次生成的模型数量,但我觉得我缺少一种更简单/干净/开箱即用的方法来做到这一点。另外,我想知道是否有任何巧妙的方法来限制同时运行的模型数量,例如查看系统的 CPU 和内存使用情况。