Luigi - 覆盖任务需要/输入

2023-11-22

我正在使用 luigi 执行一系列任务,如下所示:

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('test.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()

    def requires(self):
        return Task1(stuff=self.stuff)

    def output(self):
        return luigi.LocalTarget('something-else.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)

当我像这样启动整个工作流程时,这完全按照预期工作:

luigi.build([Task2(stuff='stuff')])

使用时luigi.build您还可以通过显式传递参数来运行多个任务,按照文档中的这个示例.

但是,在我的情况下,我还希望能够运行以下业务逻辑Task2完全独立于它对工作流程的参与。这对于未实现的任务效果很好requires, 按照这个例子.

我的问题是,如何运行此方法作为工作流程的一部分以及单独运行?显然,我可以添加一个新的私有方法,例如_my_custom_run,它获取数据并返回结果,然后在中使用此方法run,但它只是感觉应该被纳入框架中,所以这让我觉得我误解了 Luigi 的最佳实践(仍在学习框架)。任何建议表示赞赏,谢谢!


听起来像你想要的动态要求。使用该示例中显示的模式,您可以读取配置或传递带有任意数据的参数,并且yield仅根据配置中的字段您想要需要的任务。

# tasks.py
import luigi
import json
import time


class Parameterizer(luigi.Task):
    params = luigi.Parameter() # Arbitrary JSON

    def output(self):
        return luigi.LocalTarget('./config.json')

    def run(self):
        with self.output().open('w') as f:
            json.dump(params, f)

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[:6]))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()
    params = luigi.Parameter()


    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[6:]))

    def run(self):

        config = Parameterizer(params=self.params)
        yield config

        with config.output().open() as f:
            parameters = json.load(f)

        if parameters["runTask1"]:
            yield Task1(stuff=self.stuff)
        else:
            pass
        with self.output().open('w') as f:
            f.write(self.stuff)

if __name__ == '__main__':
    cf_json = '{"runTask1": True}'

    print("Trying to run with Task1...")
    luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)

    time.sleep(10)

    cf_json = '{"runTask1": False}'

    print("Trying to run WITHOUT Task1...")
    luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)

(这是通过简单地调用来执行的python tasks.py)

我们可以很容易地想象将多个参数映射到多个任务,或者在允许执行各种任务之前应用自定义测试。我们还可以重写它以获取参数luigi.Config.

另请注意以下控制流程Task2:

    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        pass

在这里,我们可以运行替代任务,或者动态调用任务,正如我们在示例中看到的那样luigi回购。例如:

    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        # self.stuff is not automatically parsed to int, so this list comp is valid
        data_dependent_deps = [Task1(stuff=x) for x in self.stuff] 
        yield data_dependent_deps

这可能比简单的更复杂一些run_standalone()方法,但我认为这是最接近您在记录的 luigi 模式中寻找的内容。

Source: https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Luigi - 覆盖任务需要/输入 的相关文章

  • 如何恢复tensorflow inceptions检查点文件(ckpt)?

    I have inception resnet v2 2016 08 30 ckpt文件是预先训练的初始模型 我想使用恢复这个模型 saver restore sess ckpt filename 但为此 我将需要编写训练该模型时使用的变量
  • 在 Python 中解析 TCL 列表

    我需要在双括号上拆分以空格分隔的 TCL 列表 例如 OUTPUT 172 25 50 10 01 01 Ethernet 172 25 50 10 01 02 Ethernet Traffic Item 1 172 25 50 10 01
  • 使用 MongoDB 作为我们的主数据库,我应该使用单独的图数据库来实现实体之间的关系吗?

    我们目前正在为一家专业公司内部实施类似 CRM 的解决方案 由于存储信息的性质以及信息的不同值和键 我们决定使用文档存储数据库 因为它完全适合目的 在本例中我们选择 MongoDB 作为此 CRM 解决方案的一部分 我们希望存储实体之间的关
  • matplotlib 图中点的标签

    所以这是一个关于已发布的解决方案的问题 我试图在我拥有的 matplotlib 散点图中的点上放置一些数据标签 我试图在这里模仿解决方案 是否有与 MATLAB 的 datacursormode 等效的 matplotlib https s
  • pandas DataFrame.join 的运行时间是多少(大“O”顺序)?

    这个问题更具概念性 理论性 与非常大的数据集的运行时间有关 所以我很抱歉没有一个最小的例子来展示 我有一堆来自两个不同传感器的数据帧 我需要最终将它们连接成两个very来自两个不同传感器的大数据帧 df snsr1 and df snsr2
  • VSCode Settings.json 丢失

    我正在遵循教程 并尝试将 vscode 指向我为 Scrapy 设置的虚拟工作区 但是当我在 VSCode 中打开设置时 工作区设置 选项卡不在 用户设置 选项卡旁边 我还尝试通过以下方式手动转到文件 APPDATA Code User s
  • 嵌套列表的重叠会产生不必要的间隙

    我有一个包含三个列表的嵌套 这些列表由 for 循环填充 并且填充由 if 条件控制 第一次迭代后 它可能类似于以下示例 a 1 2 0 0 0 0 0 0 4 5 0 0 0 0 0 0 6 7 根据条件 它们不重叠 在第二次迭代之后 新
  • 在 Django Admin 中调整字段大小

    在管理上添加或编辑条目时 Django 倾向于填充水平空间 但在某些情况下 当编辑 8 个字符宽的日期字段或 6 或 8 个字符的 CharField 时 这确实是一种空间浪费 字符宽 然后编辑框最多可容纳 15 或 20 个字符 我如何告
  • 为什么 web2py 在启动时崩溃?

    我正在尝试让 web2py 在 Ubuntu 机器上运行 所有文档似乎都表明要在 nix 系统上运行它 您需要下载源代码并执行以下操作 蟒蛇 web2py py 我抓住了source http www web2py com examples
  • Python 内置的 super() 是否违反了 DRY?

    显然这是有原因的 但我没有足够的经验来认识到这一点 这是Python中给出的例子docs http docs python org 2 library functions html super class C B def method se
  • 使用 python/numpy 重塑数组

    我想重塑以下数组 gt gt gt test array 11 12 13 14 21 22 23 24 31 32 33 34 41 42 43 44 为了得到 gt gt gt test2 array 11 12 21 22 13 14
  • 未知错误:Chrome 无法启动:异常退出

    当我使用 chromedriver 对 Selenium 运行测试时 出现此错误 selenium common exceptions WebDriverException Message unknown error Chrome fail
  • python的shutil.move()在linux上是原子的吗?

    我想知道python的shutil move在linux上是否是原子的 如果源文件和目标文件位于两个不同的分区上 行为是否不同 或者与它们存在于同一分区上时的行为相同吗 我更关心的是如果源文件和目标文件位于同一分区上 shutil move
  • pandas - 包含时间序列数据的堆积条形图

    我正在尝试使用时间序列数据在 pandas 中创建堆积条形图 DATE TYPE VOL 0 2010 01 01 Heavy 932 612903 1 2010 01 01 Light 370 612903 2 2010 01 01 Me
  • 将 Matlab 的 datenum 格式转换为 Python

    我刚刚开始从 Matlab 迁移到 Python 2 7 在读取 mat 文件时遇到一些问题 时间信息以 Matlab 的日期数字格式存储 对于那些不熟悉它的人 日期序列号将日历日期表示为自固定基准日期以来已经过去的天数 在 MATLAB
  • 重新分配唯一值 - pandas DataFrame

    我在尝试着assign unique值在pandas df给特定的个人 For the df below Area and Place 会一起弥补unique不同的价值观jobs 这些值将分配给个人 总体目标是使用尽可能少的个人 诀窍在于这
  • 制作一份 Python 文档的 PDF 文件

    Python 官方网站提供 PDF 文档下载 但它们是按章节分隔的 我下载了源代码并构建了 PDF 文档 这些文档也是单独的 PDF 我怎么能够从源代码中的 Makefile 构建一个 PDF 文件 我认为这样阅读起来会更方便 如果连接单独
  • 如何使用 Boto3 启动具有 IAM 角色的 EC2 实例?

    我无法弄清楚如何使用指定的 IAM 角色在 Boto3 中启动 EC2 实例 以下是迄今为止我如何成功创建实例的一些示例代码 import boto3 ec2 boto3 resource ec2 region name us west 2
  • 将索引与值交换的最快方法

    考虑pd Series s s pd Series list abcdefghij list ABCDEFGHIJ s A a B b C c D d E e F f G g H h I i J j dtype object 交换索引和值并
  • pytest找不到模块[重复]

    这个问题在这里已经有答案了 我正在关注pytest 良好实践 https docs pytest org en latest explanation goodpractices html test discovery或者至少我认为我是 但是

随机推荐

  • PyQt5 到 PySide2,加载不同类中的 UI 文件

    我有一个在 python3 6 下运行的 python 应用程序 并使用 PyQt5 加载 Ui 窗口 这些窗口是使用 Qt Designer 5 9 4 创建的 下面的代码显示了 PyQt5 的工作示例 现在我想拥有完全相同的功能 但使用
  • 返回具有嵌套级别和值的嵌套列表

    我想使用可视化一些深层嵌套的数据网络D3 我不知道如何在发送之前将数据转换为正确的格式radialNetwork 这是一些示例数据 level lt c 1 2 3 4 4 3 4 4 1 2 3 value lt letters 1 11
  • 是否可以将流量路由到特定 Pod?

    假设我正在 GKE 中运行我的应用程序 这是一个多租户应用程序 我创建了多个托管我的应用程序的 Pod 现在我想要 客户 1 1000 使用 Pod1 使用 Pod2 的客户 1001 2000 ETC 如果我有一个指向我的集群的 gclo
  • 使用 GLUT 在 3D OpenGL 世界中显示固定位置 2D 文本

    我有一个OpenGL项目使用GLUT 不是 freeglut 其中我想在视口上的固定位置显示 2D 文本 我的其余对象位于 3D 世界坐标中 这个答案一个相关的老问题说 GLUT 附带的位图字体是简单的 2D 字体 不适合在 3D 环境中显
  • 这是 .NET 的 Regex.Split 中的错误吗?

    我有两个正则表达式 用于Regex Split lt G and lt G 2 分割字符串时 A B C D E F G 第一个结果是 A B C D E F G 第二个结果是 A B A C D C E F E G 这里发生了什么 我以为
  • 让 html5 地理定位在用户每次重新加载页面时请求许可

    为什么我们需要它 我们的网站上有一些页面 允许用户输入一些数据并搜索其所在地区的其他客户 当用户打开该页面时 必须显示弹出消息 http foo bar com想要使用您当前的位置 之后 用户可以放弃它并手动填写位置字段或接受并自动重定向到
  • 从视频 URL 中提取缩略图

    我必须从视频 来自网址 中提取缩略图 我使用以下代码 NSString stringUrl video stringurl NSURL url NSURL URLWithString stringUrl AVURLAsset asset A
  • 用 C++ 读取另一个进程的标准输出

    在 Windows 中 有没有办法在 C 中启动进程 然后在完成后将其吐出到 stdout 中 如有必要 必须使用提升的权限 在 Vista 或更高版本上 运行该进程 我目前正在使用 ShellExecuteEx 启动进程并运行 while
  • jQuery data() 如何打破循环引用

    我读过一篇为什么它更好 and 它是如何实施的 但我真正不明白的是它如何打破循环引用 它如何打破参考圆 div1 data item div2 div2 data item div1 例如 上面的div互相指向 如何防止 我有一种预感 但我
  • 在不裁剪的情况下缩放 SDL Surface 的正确方法?

    缩放 SDL Surface 的正确方法是什么 我在网上找到了一种解释 但它需要逐像素地重新绘制表面 似乎应该有某种方法可以通过 SDL 本地执行此操作 而不是像那样重新绘制图像 我在 SDL 文档中找不到任何涉及此内容的内容 我可以通过修
  • 从 sbt shell 中设置环境变量

    我希望能够在交互式 sbt shell 中设置环境变量 但我似乎找不到方法来做到这一点 我查看了官方 sbt 文档以及 stackoverflow 但没有成功 我想明确的是我不想要必须在中设置此环境变量build sbt文件 而是能够在交互
  • android从布局xml动态添加元素

    如何从此布局 xml 中获取元素 myButton
  • 如何获取 RawInput HID 设备的人类可读名称?

    我正在将应用程序从 DirectInput 切换到 RawInput 以进行游戏手柄处理 并且我想为每个游戏手柄提供人类可读的描述 理想的情况是显示在设备管理器中的设备文本 但 USB 产品描述也可以 任何方法都应该在没有管理员许可的情况下
  • 谷歌地图 setMap 不是一个函数

    我正在处理我的谷歌地图页面在 Firebug 中显示错误的问题 当我在经销商地图中搜索时 将触发 clearLocations 函数 但出现了这个错误 markers i setMap 不是一个函数 有谁知道如何解决这个问题 我在几个论坛和
  • 更改列名而不重新创建 MySQL 表

    有没有办法在不进行重大更改的情况下重命名 InnoDB 表上的列 桌子相当大 我想避免重大停机 重命名列 使用ALTER TABLE CHANGE COLUMN 不幸的是需要 MySQL 运行全表复制 查看pt 在线架构更改 这有助于您对表
  • Netbeans 更新失败

    我的互联网连接没有问题 但是当我尝试更新 netbeans 插件或 IDE 本身时 每次都会出现此屏幕 即使重新安装操作系统也无法解决此问题 一次又一次地遇到同样的错误 我该如何修复它 在 Netbeans 中 Go to Tools gt
  • 使用 jQuery 更改图像地图坐标值

    我有一个非常复杂的图像映射 我想将其缩小一半 为此 需要将所有坐标值除以 2 由于有数千个坐标值 我想我可以使用 jQuery 遍历 DOM 来查找坐标值 然后将它们除以 2 当涉及到 JavaScript 和 jQuery 时 我非常业余
  • 为什么“复制到输出目录”选择的用语在不同位置之间会发生变化?

    VS NET 中的解决方案资源管理器中的配置值措辞与 VS NET 中的预计值之间存在不一致 csproj文件 在Solution Explorer a config或依赖文件中会有多个选项Copy To Output Directory影
  • Laravel 5.6:创建图像缩略图

    在我的旧 PHP 应用程序中 我曾经运行如下函数来创建 jpeg 图像缩略图 function imageThumbanail image src imagecreatefromjpeg http examplesite com image
  • Luigi - 覆盖任务需要/输入

    我正在使用 luigi 执行一系列任务 如下所示 class Task1 luigi Task stuff luigi Parameter def output self return luigi LocalTarget test json