Luigi Pipeline 从 S3 开始

2024-01-09

我的初始文件位于AWS S3。有人能指出我如何在一个Luigi Task?

我查看了文档并发现luigi.S3但我不清楚该怎么办,然后我在网上搜索,只得到链接mortar-luigi并在 luigi 之上实施。

UPDATE

按照为 @matagus 提供的示例(我创建了~/.boto也按照建议文件):

# coding: utf-8

import luigi

from luigi.s3 import S3Target, S3Client

class MyS3File(luigi.ExternalTask):
    def output(self):
        return S3Target('s3://my-bucket/19170205.txt')

class ProcessS3File(luigi.Task):

    def requieres(self):
        return MyS3File()

    def output(self):
        return luigi.LocalTarget('/tmp/resultado.txt')

    def run(self):
        result = None

        for input in self.input():
           print("Doing something ...")
           with input.open('r') as f:
               for line in f:
                   result = 'This is a line'

        if result:
            out_file = self.output().open('w')
            out_file.write(result)

当我执行它时什么也没有发生

DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task   ProcessS3File()   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running   ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done      ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ProcessS3File()   has status   DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread

正如您所看到的,该消息Doing something...从不打印。怎么了?


这里的关键是定义一个外部任务没有输入,哪些输出是您在 S3 中已有的文件。 Luigi 文档在中提到了这一点需要另一个任务 http://luigi.readthedocs.org/en/stable/tasks.html#requiring-another-task:

请注意,requires() 不能返回 Target 对象。如果您有一个在外部创建的简单 Target 对象,则可以将其包装在 Task 类中

所以,基本上你最终会得到这样的结果:

import luigi

from luigi.s3 import S3Target

from somewhere import do_something_with


class MyS3File(luigi.ExternalTask):

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/file')

class ProcessS3File(luigi.Task):

    def requires(self):
        return MyS3File()

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/output-file')

    def run(self):
        result = None
        # this will return a file stream that reads the file from your aws s3 bucket
        with self.input().open('r') as f:
            result = do_something_with(f)

        # and the you 
        out_file = self.output().open('w')
        # it'd better to serialize this result before writing it to a file, but this is a pretty simple example
        out_file.write(result)

UPDATE:

路易吉使用boto http://boto.readthedocs.org/en/latest/从 AWS S3 读取文件和/或将它们写入到 AWS S3,因此为了使此代码正常工作,您需要在 boto 配置文件中提供您的凭据~/boto(寻找其他可能的配置文件位置在这里 http://boto.readthedocs.org/en/latest/boto_config_tut.html):

[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Luigi Pipeline 从 S3 开始 的相关文章

随机推荐

  • Java程序查找字符串数组中元音的长度和数量

    我需要从用户那里获取一个字符串并将其输出到一个表中 包括其单词数和元音数 但我不知道如何计算元音 我尝试了以下方法 import java util Scanner public class Array2 public static voi
  • Objective C - iCal 不在 iOS 9 中创建自定义日历和新事件

    这在 iOS 8 中完美运行 但在 iOS 9 中创建问题 这里是代码 self eventManager eventStore requestAccessToEntityType EKEntityTypeEvent completion
  • Xcode Core Data:将现有 XML 更改为 Sqlite(NSXMLStoreType 更改为 NSSQLiteStoreType)

    在我的第一个应用程序中 我在持久存储协调器中使用了 NSXM StoreType storeCooordinator addPersistentStoreWithType NSXMLStoreType configuration nil U
  • 从音频流中提取 SMPTE 时间码

    我正在开发一个录音系统 我的任务涉及从同步器设备生成的音频输入流中提取 SMPTE 时间码 我使用 ASIO SDK 来获取每个回调缓冲区的时间代码 但它始终为零 也许有人有 ASIO SDK 或任何其他可用于从音频流中提取 SMPTE 时
  • 在后台检查互联网连接

    我需要在后台检查互联网连接 我正在数据库中保存一些数据 每当我连接到互联网时 它应该将数据上传到我的服务器上 我需要一个后台服务 即使我关闭我的应用程序 它也会持续检查互联网连接 我尝试了几种方法 但它们只有在我打开我的应用程序时才有效 目
  • Spring Batch - Web 服务到 Web 服务分块

    我有一个托管的网络服务 允许批量提取记录 此 Web 服务以起始记录号 ROWID 和页面大小 最大 800 作为参数 可以从此服务中提取 50 60k 条记录 并调用另一个 Web 服务以块的形式再次发布所有这些数据 而无需在其间保留数据
  • 使用jQuery获取垂直滚动的最大值

    这是我的视图代码 我使用 jQuery 它可以工作 并且在我更改屏幕分辨率之前我没有任何问题 它停止工作 我知道问题出在哪里 但我无法解决它 看看代码 model PNUBOOKIR Models ProductModels Layout
  • B树和B+树有什么区别?

    In a b tree你可以存储两者内部节点和叶节点中的键和数据 但是在一个b tree你必须将数据存储在仅叶节点 在 b 树中执行上述操作有什么好处吗 为什么不到处使用 b 树而不是 b 树 因为直观上它们看起来更快 我的意思是 为什么需
  • SwiftUI 中元素之间的间距?

    我想知道为什么 SwiftUI 中的两个元素之间会出现这种间距 以及如何控制 修改它 我尝试向 ExtractedView 添加一些填充 但它没有改变任何内容 似乎是由某种原因引起的 frame 高度 56 但这正是按钮的高度 因此它不应该
  • 使用CSS将div对齐到另一个div的底部

    我想对齐DIV c在底部DIV b not DIV a div div div Div c div div div 这应该有效 b position relative c position absolute bottom 0px 诀窍是po
  • 在 JavaScript 中,如何从类的字符串名称中检索类? [复制]

    这个问题在这里已经有答案了 我有这样的 xml 文件
  • 在 styles.xml 中设置特定字体

    我正在为我的 Android 应用程序定义样式 XML 我有一些想要使用的 TTF 文件 如何设置字体以使用这些文件作为字体 而不是通用的 sans serif 和 monospace 谢谢 您只能通过 Java 代码使用自定义字体 而不能
  • Android gradle 构建错误:(9, 0) 未找到 Gradle DSL 方法:“compile()”。

    当我尝试同步我的项目时 出现以下构建错误 Error 9 0 Gradle DSL method not found compile Possible causes The project AlexTest may be using a v
  • 对不同大小的输入运行 Haskell 基准测试

    我经常想比较同一函数的多个实现的运行时性能 对于个人输入 标准是一个很好的工具 但是 有什么简单的方法可以在不同的输入大小上绘制代码的性能 例如查看算法复杂度 理想情况下 我向库传递一个类型的值Benchmarkable r gt Stri
  • 对 Angular 中两个或多个 DIV 位置的位置进行动画处理

    我需要帮助来调整动态渲染的 2 个或更多 div 的位置开关 这是一个简单的例子 let objects Array
  • 使用 sed 检测与模式匹配的两个连续行

    我正在寻找与特定模式匹配的两个连续行 例如使用 sed 包含单词 pat 并注意到我有时可以使用以下命令检测到它 sed n N pat n pat p 但如果重复项的行号不具有相同的奇偶校验 则此命令会失败 我认为这是因为我们正在搜索行
  • UIAlertView 没有取消按钮?

    我正在尝试创建一个具有 3 个选项且没有 取消 按钮的 UIAlertView 但是当我这样做时 它总是将 按钮 3 样式设置为取消按钮 有什么办法可以避免这种情况吗 UIAlertView alertView UIAlertView al
  • WPF MVVM:如何将 GridViewColumn 绑定到 ViewModel-Collection?

    在我的视图中 我将 ListView 绑定到 ViewModel 中的 CollectionView 例如如下所示
  • php 101 使用原子格式的日期时间

    我正在尝试使用 DateTime 类以 DateTime ATOM 格式输出当前时间 我什至不确定我是否正确使用它 或者我是否必须导入库或者打开 wamp php 模块 我收到 语法错误 意外的 T NEW 错误 这是代码 你会用DateT
  • Luigi Pipeline 从 S3 开始

    我的初始文件位于AWS S3 有人能指出我如何在一个Luigi Task 我查看了文档并发现luigi S3但我不清楚该怎么办 然后我在网上搜索 只得到链接mortar luigi并在 luigi 之上实施 UPDATE 按照为 matag