如何具体确定MRJob中每个map步骤的输入?

2024-04-01

我正在从事一项地图缩减工作,包含多个步骤。使用 mrjob,每个步骤都会接收上一步的输出。问题是我不想这样。

我想要的是提取一些信息并在第二步中针对所有输入等使用它。可以使用 mrjob 来做到这一点吗?

Note: 因为我不想使用emr,这个问题 https://stackoverflow.com/questions/9302580/multiple-inputs-with-mrjob对我来说没有太大帮助。

UPDATE:如果不可能在一项工作中完成此操作,我需要在两项单独的工作中完成。在这种情况下,有什么方法可以包装这两个作业并管理中间输出等?


您可以使用Runners http://mrjob.readthedocs.org/en/latest/guides/runners.html

您必须单独定义作业并使用另一个 python 脚本来调用它。

from NumLines import NumLines
from WordsPerLine import WordsPerLine
import sys

intermediate = None

def firstJob(input_file):
    global intermediate
    mr_job = NumLines(args=[input_file])
    with mr_job.make_runner() as runner:
        runner.run()
        intermediate = runner.get_output_dir()

def secondJob(input_file):
    mr_job = WordsPerLine(args=[intermediate,input_file])
    with mr_job.make_runner() as runner:
        runner.run()

if __name__ == '__main__':
    firstJob(sys.argv[1]) 
    secondJob(sys.argv[1])

并且可以通过以下方式调用:

python main_script.py input.txt
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何具体确定MRJob中每个map步骤的输入? 的相关文章

  • Gunicorn 工作人员无论如何都会超时

    我正在尝试通过gunicorn运行一个简单的烧瓶应用程序 但是无论我做什么 我的工作人员都会超时 无论是否有针对应用程序的活动 工作人员在我设置任何内容后总是会超时timeout值到 是什么导致它们超时 当我发出请求时 请求成功通过 但工作
  • 如何在 __init__ 中使用await设置类属性

    我如何定义一个类await在构造函数或类体中 例如我想要的 import asyncio some code class Foo object async def init self settings self settings setti
  • 在 Celery 任务中调用 Google Cloud API 永远不会返回

    我正在尝试拨打外部电话Google Cloud Natural Language API从一个内Celery任务 使用google cloud python包裹 问题是对 API 的调用永远不会返回 挂起 celery task def g
  • matplotlib 图中点的标签

    所以这是一个关于已发布的解决方案的问题 我试图在我拥有的 matplotlib 散点图中的点上放置一些数据标签 我试图在这里模仿解决方案 是否有与 MATLAB 的 datacursormode 等效的 matplotlib https s
  • 多输出堆叠回归器

    一次性问题 我正在尝试构建一个多输入堆叠回归器 添加到 sklearn 0 22 据我了解 我必须结合StackingRegressor and MultiOutputRegressor 经过多次尝试 这似乎是正确的顺序 import nu
  • VSCode Settings.json 丢失

    我正在遵循教程 并尝试将 vscode 指向我为 Scrapy 设置的虚拟工作区 但是当我在 VSCode 中打开设置时 工作区设置 选项卡不在 用户设置 选项卡旁边 我还尝试通过以下方式手动转到文件 APPDATA Code User s
  • 嵌套列表的重叠会产生不必要的间隙

    我有一个包含三个列表的嵌套 这些列表由 for 循环填充 并且填充由 if 条件控制 第一次迭代后 它可能类似于以下示例 a 1 2 0 0 0 0 0 0 4 5 0 0 0 0 0 0 6 7 根据条件 它们不重叠 在第二次迭代之后 新
  • 如何从Python中的函数返回多个值? [复制]

    这个问题在这里已经有答案了 如何从Python中的函数返回多个变量 您可以用逗号分隔要返回的值 def get name you code return first name last name 逗号表示它是一个元组 因此您可以用括号将值括
  • 从Django中具有外键关系的两个表中检索数据? [复制]

    这个问题在这里已经有答案了 This is my models py file from django db import models class Author models Model first name models CharFie
  • Python 3d 绘图设置固定色阶

    我正在尝试绘制两个 3d 数组 第一个数组的 z 值在范围内 0 15 0 15 第二个来自 0 001 0 001 当我绘图时 色标自动遵循数据范围 如何设置自定义比例 我不想看到 0 001 的浅色 而应该看到 0 15 的浅色 如何修
  • 打印包含字符串和其他 2 个变量的变量

    var a 8 var b 3 var c hello my name is var a and var b bye print var c 当我运行程序时 var c 会像这样打印出来 hello my name is 8 and 3 b
  • 如何将特定范围内的标量添加到 numpy 数组?

    有没有一种更简单 更节省内存的方法可以单独在 numpy 中执行以下操作 import numpy as np ar np array a l r ar c a a 0 l ar tolist a r 它可能看起来很原始 但它涉及获取给定数
  • 为什么一旦我离开内置的运行服务器,Django 就无法找到我的管理媒体文件?

    当我使用内置的简单服务器时 一切正常 管理界面很漂亮 python manage py runserver 但是 当我尝试使用 wsgi 服务器为我的应用程序提供服务时django core handlers wsgi WSGIHandle
  • 关于 Hadoop 和压缩输入文件的非常基本的问题

    我已经开始研究 Hadoop 如果我的理解是正确的 我可以处理一个非常大的文件 它会被分割到不同的节点上 但是如果文件被压缩 那么文件就无法分割 并且需要由单个节点处理 有效地破坏了运行一个mapreduce 一个并行机器集群 我的问题是
  • 如何将 GAE 中一种 Kind 中的所有实体复制到另一种 Kind 中,而无需显式调用每个属性

    我们如何使用function clone entity 如中所述在 Python 中复制 Google App Engine 数据存储中的实体 而无需在 编译 时知道属性名称 https stackoverflow com question
  • 重新分配唯一值 - pandas DataFrame

    我在尝试着assign unique值在pandas df给特定的个人 For the df below Area and Place 会一起弥补unique不同的价值观jobs 这些值将分配给个人 总体目标是使用尽可能少的个人 诀窍在于这
  • Firebase Firestore:获取文档的生成 ID (Python)

    我可以创建一个新文档 带有自动生成的 ID 并存储对其的引用 如下所示 my data key value doc ref db collection u campaigns add my data 我可以像这样访问数据本身 print d
  • 在virtualenv中下载sqlite3

    我正在尝试使用命令创建应用程序python3 manage py startapp webapp但我收到一条错误消息 django core exceptions ImproperlyConfigured 加载时出错 pysqlite2 或
  • python 对浮点数进行不正确的舍入

    gt gt gt a 0 3135 gt gt gt print 3f a 0 314 gt gt gt a 0 3125 gt gt gt print 3f a 0 312 gt gt gt 我期待 0 313 而不是 0 312 有没有
  • NLTK:查找单词大小为 2k 的上下文

    我有一个语料库 我有一个词 对于语料库中该单词的每次出现 我想获取一个包含该单词之前的 k 个单词和该单词之后的 k 个单词的列表 我在算法上做得很好 见下文 但我想知道 NLTK 是否提供了一些我错过的功能来满足我的需求 def size

随机推荐