使用变量设置 Dask Worker

2023-12-03

我想在工作人员加载时分发一个更大的对象(或从磁盘加载)并将其放入全局变量(例如calib_data)。这对 Dask 工作人员有用吗?


好像是客户端方法注册工人回调在这种情况下可以做你想做的事。你仍然需要某处放置你的变量,因为在 python 中没有真正的全局范围。例如,某个地方可以是导入模块的任何属性,然后任何工作人员都可以访问该属性。您还可以将其添加为工作实例本身的属性,但我认为没有明显的理由要这样做。

一种有效的方法是劫持随机选择的内置模块;但我并不特别推荐这个(见下文)

def attach_var(name, value):
    import re
    re.__setattr__(name, value)

client.run(attach_var, 'x', 1)

def use_var():
    # any function running on a worker can do this, via delayed or
    # whatever method you pass with
    import re
    return re.x

client.run(use_var)

不过,在继续之前,您是否已经考虑过delayed(calib_data) or scatter,这会将您的变量复制到需要的位置,例如,

futures = client.scatter(calib_data, broadcast=True)

或者确实使用普通的方式将数据加载到工作人员中delayed语义学

dcalib = dask.delayed(load_calib_data)()
work = dask.delayed(process_stuff)(dataset1, dcalib)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用变量设置 Dask Worker 的相关文章

  • 如何从 Dask-Yarn 作业中捕获工人的日志?

    我尝试过使用以下内容 config dask distributed yaml and config dask yarn yaml logging file config path to config ini or logging vers
  • dask 持久行为不一致

    如果我注释掉这一行 我发现 dask 的奇怪行为仍然存在 client Client memory limit 20GB n workers 1 Connect to distributed cluster and override def
  • 在“from_delayed”JSON 文件中发现 DASK 元数据不匹配

    我刚刚开始我的冒险DASK我正在学习 json 格式的示例数据集 我知道对于初学者来说这不是世界上最简单的数据格式 我有一个数据集json格式 我通过加载数据dd read json到数据框 一切顺利 问题发生在 例如 compute or
  • Dask 分布式工作线程在运行许多任务时总是会泄漏内存

    有哪些策略可以解决或调试这个问题 distribution worker 警告 内存使用率很高 但工作线程没有数据可存储到磁盘 也许其他进程正在泄漏内存 进程内存 26 17 GB 工作内存限制 32 66 GB 基本上 我只是在一台机器上
  • 气流+芹菜或dask。为了什么,什么时候?

    我阅读了 Airflow 官方文档下列 https airflow apache org configuration html scaling out with celery 这究竟意味着什么 作者所说的横向扩展是什么意思 那是 when
  • 使用 Dask 的新 to_sql 来提高效率(内存/速度)或替代方案将数据从 dask 数据帧获取到 SQL Server 表

    我的最终目标是结合使用 SQL Python 来处理一个项目 该项目的数据量太大 以至于 pandas 无法处理 至少在我的机器上 所以 我已经和dask to 从多个源读取数据 主要是 SQL Server 表 视图 将数据操作 合并到一
  • 使用 dask 加载大型数据集

    我处于具有集群 紧密耦合互连和支持 Lustre 文件系统的 HPC 环境中 我们一直在探索如何利用 Dask 不仅提供计算 而且充当分布式缓存来加速我们的工作流程 我们专有的数据格式是 n 维且规则的 并且我们编写了一个惰性读取器以传递到
  • Dask 中的二维布尔索引

    我想使用 Dask 进行二维索引 这是该任务的示例 array1 xr DataArray 1 3 4 7 6 4 15 2 chunk 2 array2 xr DataArray 1 3 4 9 1 4 3 2 chunk 2 array
  • 当 SageMath 代码在 python 中运行时,使用 Dask 会抛出 ImportError

    这个问题和我的很相似先前的问题 https stackoverflow com questions 68958031 using dask throws importerror when run inside sagemath并受到其中一条
  • Dask 数据帧并行任务

    我想从数据帧创建功能 附加列 并且我有以下许多功能的结构 遵循本文档https docs dask org en stable delayed best practices html https docs dask org en stabl
  • 如何从 url 列表创建 Dask DataFrame?

    我有一个 URL 列表 我很想将它们读取到 dask 数据框中 立刻 但看起来像read csv不能使用星号http 有什么办法可以实现这一点吗 这是一个例子 link http web mta info developers data d
  • 使用 dask 合并大型数据集

    我有两个数据集 一个约为 45GB 包含 1 年的日常交易 第二个数据集为 3 6GB 包含客户 ID 和详细信息 我想将两者合并到一个公共列上以创建一个数据集 这超出了服务器的内存 因为每个客户可能有多个交易 我正在开发一个具有 16 个
  • dask 数据帧的 iloc 相当于什么?

    我遇到一种情况 我需要按位置索引 dask 数据帧 我看到没有 iloc方法可用 还有其他选择吗 或者我是否需要使用基于标签的索引 例如 我想 import dask dataframe as dd import numpy as np i
  • python dask DataFrame,支持(可并行化)行应用吗?

    我最近发现dask http dask pydata org en latest index html旨在成为一个易于使用的 python 并行处理模块 对我来说最大的卖点是它可以与熊猫一起使用 在阅读了其手册页后 我找不到一种方法来完成这
  • dask 可以用于在核心之外进行分组和重新编码吗?

    我有 8GB csv 文件和 8GB RAM 每个文件每行有两个字符串 格式如下 a c c a f g a c c a b f c a 对于较小的文件 我删除重复项 计算前两列中每行的副本数 然后将字符串重新编码为整数如下 https s
  • Dask“没有名为 xxxx 的模块”错误

    使用dask分布式我尝试提交一个位于另一个名为worker py的文件中的函数 在工人中我有以下错误 没有名为 worker 的模块 但是我无法弄清楚我在这里做错了什么 这是我的代码示例 import worker def run self
  • 将字符串转换为字典,然后访问键:值???如何访问 Python 中的数据?

    我在访问字典内的数据时遇到问题 系统 Macbook 2012Python Python 3 5 1 Continuum Analytics Inc 我正在与一个dask dataframe http dask pydata org en
  • Pandas hub_table 更快的替代品

    我正在使用熊猫pivot table在大型数据集 1000 万行 6 列 上运行 由于执行时间至关重要 因此我尝试加快流程 目前 处理整个数据集大约需要 8 秒 这太慢了 我希望找到替代方案来提高速度 性能 我当前的 Pandas 数据透视
  • pandas DataFrame 中行的高效成对比较

    我目前正在处理一个较小的数据集 大约 900 万行 不幸的是 大多数条目都是字符串 即使强制类别 框架在内存中也只有几 GB 我想做的是将每一行与其他行进行比较 并对内容进行直接比较 例如 给定 A B C D 0 cat blue old
  • 初始化 dask 分布式工作线程的状态

    我正在尝试做类似的事情 resource MyResource def fn x something dosemthing x resource return something client Client results client m

随机推荐