听起来像你想要的动态要求。使用该示例中显示的模式,您可以读取配置或传递带有任意数据的参数,并且yield
仅根据配置中的字段您想要需要的任务。
# tasks.py
import luigi
import json
import time
class Parameterizer(luigi.Task):
params = luigi.Parameter() # Arbitrary JSON
def output(self):
return luigi.LocalTarget('./config.json')
def run(self):
with self.output().open('w') as f:
json.dump(params, f)
class Task1(luigi.Task):
stuff = luigi.Parameter()
def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[:6]))
def run(self):
with self.output().open('w') as f:
f.write(self.stuff)
class Task2(luigi.Task):
stuff = luigi.Parameter()
params = luigi.Parameter()
def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[6:]))
def run(self):
config = Parameterizer(params=self.params)
yield config
with config.output().open() as f:
parameters = json.load(f)
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass
with self.output().open('w') as f:
f.write(self.stuff)
if __name__ == '__main__':
cf_json = '{"runTask1": True}'
print("Trying to run with Task1...")
luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)
time.sleep(10)
cf_json = '{"runTask1": False}'
print("Trying to run WITHOUT Task1...")
luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)
(这是通过简单地调用来执行的python tasks.py
)
我们可以很容易地想象将多个参数映射到多个任务,或者在允许执行各种任务之前应用自定义测试。我们还可以重写它以获取参数luigi.Config
.
另请注意以下控制流程Task2
:
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass
在这里,我们可以运行替代任务,或者动态调用任务,正如我们在示例中看到的那样luigi
回购。例如:
if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
# self.stuff is not automatically parsed to int, so this list comp is valid
data_dependent_deps = [Task1(stuff=x) for x in self.stuff]
yield data_dependent_deps
这可能比简单的更复杂一些run_standalone()
方法,但我认为这是最接近您在记录的 luigi 模式中寻找的内容。
Source: https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies