控制 Akka 中消耗大量内存的 actor 的生成

2024-04-01

我使用 akka 的 actor 模型构建了一个分布式流机器学习模型。通过向 Actor 发送训练实例(训练数据)来异步训练模型。对这些数据的训练会占用计算时间并改变参与者的状态。

目前我正在使用历史数据来训练模型。我想运行一堆不同配置的模型,这些模型在相同的数据上进行训练,并查看不同的集成指标有何变化。本质上,这是对 Thread.sleep(1) 和表示计算时间和状态的数据数组进行的操作的简单得多的模拟。

implicit val as = ActorSystem()

case object Report

case class Model(dataSize: Int) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Thread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val trainingData = Array.fill(5000)(Random.nextInt)

val dataSizeParams = (1 to 500)

接下来我使用 for 循环来改变参数(由 dataSizeParams 数组表示)

for {
  param <- dataSizeParams
} {
  // make model with params
  val model = Model(param)
  for {
    trainingInstance <- trainingData
  } {
    model.train(trainingInstance)
  }
  model.report
}

for 循环绝对是我想做的事情的错误方式。它并行启动所有不同的模型。当 dataSizeParams 在 1 到 500 范围内时它效果很好,但是如果我将其提高到较高的值,我的模型每个都会开始占用明显的内存块。我想出的是下面的代码。本质上,我有一个模型大师,他可以根据他收到的运行消息的数量来控制同时运行的模型数量。现在,每个模型都包含对此主参与者的引用,并在处理完成后向他发送一条消息:

// Alternative that doesn't use a for loop and instead controls concurrency through what I'm calling a master actor
case object ImDone
case object Run

case class Model(dataSize: Int, master: ActorRef) {
  val modelActor: ActorRef = actor(new Act {
    val data = Array.fill(dataSize)(0)
    become {
      case trainingData: Int => {
        // Screw with the state of the actor and pretend that it takes time
        Tread.sleep(1)
        data(Math.abs(Random.nextInt % dataSize)) == trainingData
      }
      case Report => {
          println(s"Finished $dataSize")
          master ! ImDone
          context.stop(self)
        }
      }
    })

  def train(trainingInstance: Int) = modelActor ! trainingInstance

  def report: Unit = modelActor ! Report
}

val master: ActorRef = actor(new Act {
  var paramRuns = dataSizeParams.toIterator
  become {
    case Run => {
      if (paramRuns.hasNext) {
        val model = Model(paramRuns.next(), self)
        for {
          trainingInstance <- trainingData
        } {
          model.train(trainingInstance)
        }
        model.report
      } else {
        println("No more to run")
        context.stop(self)
      }
    }
    case ImDone =>  {
      self ! Run
    }
  }
})

master ! Run

主代码没有任何问题(我可以看到)。我可以严格控制一次生成的模型数量,但我觉得我缺少一种更简单/干净/开箱即用的方法来做到这一点。另外,我想知道是否有任何巧妙的方法来限制同时运行的模型数量,例如查看系统的 CPU 和内存使用情况。


您正在寻找工作拉动模式。我强烈推荐 Akka 开发者写的这篇博文:

http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2 http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2

我们在 Akka 的集群功能之上使用了一个变体,以避免流氓并发。通过工人演员pull工作而不是有主管push工作时,您可以通过简单地限制工作参与者的数量来优雅地控制工作量(以及 CPU 和内存使用量)。

与纯路由器相比,这有一些优点:更容易跟踪故障(如该帖子所述),并且工作不会在邮箱中滞留(可能会丢失)。

另外,如果您使用远程处理,我建议您not在消息中发送大量数据。让工作节点在触发时自行从另一个源提取数据。我们使用S3。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

控制 Akka 中消耗大量内存的 actor 的生成 的相关文章

  • Scala 重载构造函数和 super

    我无法理解如何在 Java 上开发类似于以下的 Scala 代码 public abstract class A protected A protected A int a public abstract class B protected
  • Tensorflow推荐的系统规格?

    我开始在我的 RHEL 6 5 机器上安装 Tensorflow 但事实证明 Tensorflow 需要 glibc gt 2 17 而 rhel 6 5 上默认的 glibc 是 2 12 我想知道是否有人可以帮助我了解张量流的最低 推荐
  • 地图应用的聚类算法

    我正在研究地图上的聚类点 纬度 经度 对于快速且可扩展的合适算法有什么建议吗 更具体地说 我有一系列纬度 经度坐标和一个地图视口 我正在尝试将靠近的点聚集在一起以消除混乱 我已经有了解决问题的方法 see here http bouldr
  • 简单的 Scala actor 问题

    我确信这是一个非常简单的问题 但很不好意思地说我无法理解它 我有一个 Scala 值列表 我想使用演员来并行地对每个值进行一些 外部 调用 我想等到所有值都已处理完毕 然后继续 没有共享值被修改 有人可以建议吗 Thanks Scala 中
  • 为什么我的 Project Euler Problem 12 算法这么慢?

    我已经在 Scala 中为 PE P12 创建了解决方案 但速度非常非常慢 有人可以告诉我为什么吗 如何优化这个 calculateDevisors 简单的方法和calculateNumberOfDivisors 除数函数具有相同的速度 i
  • 将无形状 HList 转换为 TupleN,其中元组形状不需要与 HList 形状完全匹配

    我想创建相当于 def toTupleN A1 AN L lt HList l L TupleN A1 AN 代码使用toTupleN仅当恰好有一个时才应该编译N中的值的组合l可以从中创建元组 其他任何内容都应该生成编译时错误 应考虑可用的
  • 为什么调用 take() 方法时 Slick 会生成子查询

    I use Slick http slick typesafe com 1 0 0 RC1 我对表对象有这样的定义 object ProductTable extends Table Int String String String Dou
  • 在Python中表示语料库句子的一种热门编码

    我是 Python 和 Scikit learn 库的初学者 我目前需要从事一个 NLP 项目 该项目首先需要通过 One Hot Encoding 来表示一个大型语料库 我已经阅读了 Scikit learn 关于 preprocessi
  • Scala 组合器解析器 - 区分数字字符串和变量字符串

    我正在做 Cay Horstmann 的组合器解析器练习 我想知道区分代表数字的字符串和代表匹配语句中变量的字符串的最佳方法 def factor Parser ExprTree wholeNumber expr ident case a
  • 我应该在请求中创建 executorService 还是在 Web 应用程序中共享一个实例?

    我正在向基于 Jersey 的 Web 服务添加一个新端点 支持端点的逻辑需要对另一个服务进行 10 到 50 次调用 这些调用是独立的并且可以并行化 因此我正在考虑使用执行器服务将工作分配到多个线程 我想知道是否应该为每个请求实例化一个
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • Java / Scala Future 由回调驱动

    简洁版本 我怎样才能创建一个Promise
  • 在使用 stop_token 等待条件变量_any 时是否需要拥有锁来请求停止?

    在等待条件变量时 更改谓词状态的线程必须拥有锁 因此在唤醒期间不会错过更新 根据文档 这是必要的 即使在使用原子变量时也是如此 不过我不确定是否request stop 已经正确处理了 那么问题是 这两个选项中哪一个是正确且符合标准的呢 j
  • C++ 类内线程并发

    我试图在一个类中运行两个并发线程 它们都使用相同的函数打印数据 使用 std lock guard 进行作用域锁 问题是只有第一个线程被触发 第二个线程永远不会被调用 include
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 如何在sklearn决策树中显示特征名称?

    我目前有一个决策树 将功能名称显示为X index i e X 0 X 1 X 2 etc from sklearn import tree from sklearn tree import DecisionTreeClassifier d
  • 使用 to_categorical 转换 np.array 时出现内存问题

    我有一个像这样的 numpy 数组 0 1 1 0 0 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 1 0 1 我这样改造它以减少内存需求 x val x val asty
  • 提高SVM分类器准确率的技术

    我正在尝试使用 UCI 数据集构建一个分类器来预测乳腺癌 我正在使用支持向量机 尽管我尽最大努力提高分类器的准确性 但仍无法超过 97 062 我尝试过以下方法 1 Finding the most optimal C and gamma
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S

随机推荐

  • 从源代码构建 Clang 时什么时候需要 libc++ 源代码?

    我多年来一直在 Linux 和 OS X 上构建 Clang LLVM 下载页面 http llvm org releases download html 我一直使用以下来源 LLVM LLVM 源 编译器前端 Clang 源 编译器 RT
  • 在Python中混合yield和return语句是个好习惯吗?

    我很想有以下行为 def foo bar None if bar return other function other thing bar else for i in other thing yield other function i
  • Discord JS - 交互创建和消息创建

    我一直在尝试使用交互创建事件 但不知何故它不起作用 我不知道为什么 而且我没有找到有关此事件的确切文档 只知道它用于执行斜杠命令 但是为了这个目的我使用消息创建事件 并且运行良好 const Event require handlers E
  • 点击透明图像像素

    我不希望图像的透明部分可点击 我发现
  • 是否可以在 libGDX 中禁用帧限制?

    更具体地说 是一个桌面 libGDX LWJGL 应用程序 有一些配置选项可以禁用 CPU 同步以及垂直同步 但不管应用程序以 60 fps 运行 这对于所有实际用途来说都很好 但出于好奇 如果没有别的原因 我想看看帧速率可以达到多高 罗德
  • CloudFront 如何在 S3 的现有分发服务网站上设置反向代理

    我有一个 S3 存储桶 它托管一个网站并通过 CloudFront 交付 现在我已将发行版附加到我的顶点根域 例如 www xyz com 因此 之前我们使用 Nginx 从同一域上的网络服务器根提供静态前端 www xyz com 并且还
  • Chart.js 每个点的自定义图像

    我正在使用 Chart js 并且正在寻求有关散点图上每个点的自定义图像的帮助 我尝试过使用 javascript 图像数组 但它不起作用 我是 canvas 和 html5 的新手 我想要的是每个点都是用户的小个人资料图片而不是圆圈 一个
  • 使用 Node.js 的文件系统观察器

    我想使用 node js 实现文件系统观察器 以便它监视特定目录中添加 删除的任何文件 有人可以给出如何实现这个的想法吗 Thanks 查看fs watchFile filename options listener http nodejs
  • 通过模态加载动态 URL

    假设我有以下由循环生成 X 次的链接 a class btn href Launch Modal a 这是启动模式的 JS 脚本 document ready function view more modal remote item vie
  • 权限拒绝:打开提供程序 android.support.v4.content.FileProvider

    我在尝试在 Android 模拟器中执行应用程序升级时遇到了一些问题 场景的流程来自一个 Activity 我将执行异步任务A哪个打开片段A 然后在里面异步任务A 我会检查是否有版本升级 如果可用并且用户选择了 确定 片段A 我将继续异步任
  • 调用需要 API 级别 29(当前最低为 21):`android.widget.NumberPicker#setTextColor`

    我想使用 setTextColor 更改选定的文本颜色字段 但是 Android Studio 给了我这个错误 我应该怎么办 最小 SDK 为 21 这是我的 CustomNumberPicker 类的代码 import android a
  • MongoDb 解释失败:“未知的顶级运算符:$query”

    我试图从非常简单的查询中获得解释 它使用具有以下架构的帖子集合 gt db posts findOne id ObjectId 55236e6182bf196454a952b6 Content wuOfCjKborHcxkoyXzXiW C
  • Mysql CASE WHEN JOIN 语句错误

    Mysql查询 SELECT FROM pet info LEFT JOIN lostpets ON pet info id lostpets petid LEFT JOIN pet images ON pet info id pet im
  • 页面加载后部分渲染

    我有一个包含一些用户控件的页面 我想在回发后加载这些用户控件 就像 ajax 渲染一样 每个用户控件都显示数据库中的列表 我不希望用户在服务器代码构建响应时等待 我认为如果为用户显示页面并且在通过 ajax 请求加载用户控件之后 这将很有用
  • 如何在未来取消时终止 Callable 中的 CXF Web 服务调用

    Edit 这个问题现在已经经历了几次迭代 所以请随意查看修订版本 以了解有关历史和尝试过的事情的一些背景信息 我将 CompletionService 与 ExecutorService 和 Callable 一起使用 通过 CXF 生成的
  • 包含 jQuery 会导致标准 JavaScript 停止运行?

    我正在开始使用 jQuery 我一直在尝试将它与一些预先存在的 JavaScript 代码混合 这样我就不必重写所有内容 我读过很多地方都说这是完全可行的 然而 每当我包含任何 jQuery 行时 标准 JavaScript 就会停止运行
  • RubyKoans:破损的公案?

    可能是业余爱好者的标志 我想知道问题是否出在公案 而不是我 但是 考虑一下这个公案 def test calling global methods without parentheses result my global method 2
  • 取消 ChangeNotifier 内的 Firebase 监听器

    当我尝试取消 Firestore 侦听器时ProductsService cancel 我收到错误 错误 flutter lib ui ui dart state cc 209 未处理的异常 LateInitializationError
  • JSF 2.0 validateRegex 带有自己的验证器消息

    我有一个与此类似的代码
  • 控制 Akka 中消耗大量内存的 actor 的生成

    我使用 akka 的 actor 模型构建了一个分布式流机器学习模型 通过向 Actor 发送训练实例 训练数据 来异步训练模型 对这些数据的训练会占用计算时间并改变参与者的状态 目前我正在使用历史数据来训练模型 我想运行一堆不同配置的模型