Spark 和 Python 使用自定义文件格式/生成器作为 RDD 的输入

2024-05-23

我想问一下 Spark 中输入的可能性。我可以看到从http://spark.apache.org/docs/latest/programming-guide.html http://spark.apache.org/docs/latest/programming-guide.html,我可以使用sc.textFile()用于将文本文件读取到 RDD,但我想在分发到 RDD 之前进行一些预处理,例如我的文件可能是 JSON 格式,例如{id:123, text:"...", value:6}我只想使用 JSON 的某些字段进行进一步处理。

我的想法是是否有可能以某种方式使用 Python 生成器作为 SparkContext 的输入?

或者 Spark 中是否有一些更自然的方式如何处理自定义文件,而不是 Spark 的纯文本文件?

EDIT:

看来接受的答案应该有效,但它让我想到了更实际的以下问题Spark 和 Python 尝试使用 gensim 解析维基百科 https://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim


最快的方法可能是按原样加载文本文件并进行处理以在生成的 RDD 上选择所需的字段。这可以跨集群并行工作,并且比在单台机器上进行任何预处理更有效地扩展。

对于 JSON(甚至 XML),我认为您不需要自定义输入格式。由于 PySpark 在 Python 环境中执行,因此您可以使用 Python 中常用的函数来反序列化 JSON 并提取所需的字段。

例如:

import json

raw = sc.textFile("/path/to/file.json")
deserialized = raw.map(lambda x: json.loads(x))
desired_fields = deserialized.map(lambda x: x['key1'])

desired_fields现在是下面所有值的 RDDkey1在原始 JSON 文件中。

您可以使用此模式来提取字段的组合,通过空格或其他方式分割它们。

desired_fields = deserialized.map(lambda x: (x['key1'] + x['key2']).split(' '))

如果这变得太复杂,您可以替换lambda使用常规的 Python 函数来完成您想要的所有预处理并只需调用deserialized.map(my_preprocessing_func).

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 和 Python 使用自定义文件格式/生成器作为 RDD 的输入 的相关文章

随机推荐

  • C# 根据当前日期传递日期时间值

    我正在尝试根据 sql server 中的两个日期获取记录 Select from table where CreatedDate between StartDate and EndDate我通过了5 12 2010 and 5 12 20
  • Laravel,控制器中的 Auth::user()

    Laravel 框架 为什么我无法在 laravel 项目的控制器中使用 Auth user 查看用户是否已登录 Session 是否未连接到控制器 HomeController php public function isauthoriz
  • 将客户分配到 magento 的多个客户组

    您好 我想将多个组分配给特定客户 例如 Rajat 客户 属于 批发 零售商 电力 实际上我在上面看到了同样的话题每个客户有多个客户组 https stackoverflow com questions 6153011 multiple c
  • OpenGL:如何检查用户是否支持glGenBuffers()?

    我检查了文档 它说 OpenGL 版本必须至少为 1 5 才能制作glGenBuffers 工作 用户使用的是1 5版本但是函数调用会导致崩溃 这是文档中的错误 还是用户的驱动程序问题 我正在用这个glGenBuffers 对于VBO 我如
  • 将不连续范围从一张纸复制到另一张纸

    VBA 新手 也是第一次发帖 可能会问一个非常基本的问题 然而 我在互联网上 或在我拥有的参考书中 没有找到答案 所以我很困惑 如何将一张纸中的一堆间隔开的列放入另一张纸中 但没有间隙 例如 我想从这样的工作表中复制标记为 x 的单元格 x
  • 通过单击两次提交来避免在 Asp.net MVC 中重复提交表单

    我正在 Asp net MVC 中渲染一个带有提交按钮的表单 成功将记录添加到数据库后 页面将重定向 以下是代码 HttpPost public ActionResult Create BrandPicView brandPic if Mo
  • 使用 for 循环创建一系列元组

    我已经搜索过 但找不到答案 尽管我确信它已经存在了 我对 python 很陌生 但我以前用其他语言做过这种事情 我正在以行形式读取数据文件 我想将每行数据存储在它自己的元组中 以便在 for 循环之外访问 tup i inLine wher
  • 如何将查找和替换限制为 CSV 中的一列?

    我有一个 4 列 CSV 文件 例如 0001 fish animal eats worms I use sed对文件进行查找和替换 但我需要将此查找和替换限制为仅在第 3 列中找到的文本 如何让查找和替换仅发生在这一列上 您确定要使用se
  • 在“onClick”上切换 DIV 高度

    我想切换分区的高度 我尝试过将 animate 与 if else 语句一起使用 但它只会反弹 我现在使用的代码将隐藏我的分区而不是切换高度 点击时会触发 document ready function content1 toggle fu
  • Unity手游触摸动作不扎实

    我的代码中有一种 错误 我只是找不到它发生的原因以及如何修复它 我是统一的初学者 甚至是统一的手机游戏的初学者 我使用触摸让玩家从一侧移动到另一侧 但问题是我希望玩家在手指从一侧滑动到另一侧时能够平滑移动 但我的代码还会将玩家移动到您点击的
  • 寻找局部最小值

    下面的代码正确地找到了数组的局部最大值 但未能找到局部最小值 我已经进行了网络搜索 以找到找到最小值的最佳方法 并且根据这些搜索 我认为我正在使用下面的正确方法 但是 在几天的时间里多次检查每一行之后 下面的代码中有一些我仍然没有看到的错误
  • Lua 上的 For 循环

    我的作业是如何执行 for 循环 我已经从数字上弄清楚了 但无法从名称上弄清楚 我想创建一个 for 循环来运行名称列表 以下是我到目前为止所拥有的 names John Joe Steve for names 1 3 do print n
  • Guid.NewGuid().GetHashCode() 用于数据库

    这对于用作数据存储 SQL Server 的 ID 可靠吗 我会使用 guid 但我更喜欢数字值 A guid更有可能代表一个记录uniquely than a numeric value 随着 GUID 确保全球唯一性 GUID 可以跨数
  • 一个地址有多少字节? [复制]

    这个问题在这里已经有答案了 在64位机器上 我们知道一个地址是8个字节 然而 我并不完全清楚一个地址中有多少字节的信息 虚拟内存中的每个字节都有一个地址吗 或者内存中的每 64 位都有一个地址 还是取决于架构 如果这取决于架构 那么我应该如
  • NHibernate 克服 NotSupportedException

    有谁知道有什么方法可以克服 NotSupportedException 我有一个针对用户的方法 public virtual bool IsAbove User otherUser return HeirarchyString Starts
  • 如何通过 Android 按钮单击运行单独的应用程序

    我尝试在 Android 应用程序中添加两个按钮 以从单独的两个应用程序订单系统和库存系统中选择一个应用程序 如图所示 我已将这两个应用程序实现为两个单独的 Android 项目 当我尝试运行此应用程序时 它会出现直到正确选择窗口 但是当按
  • VBA - 如何从网站下载.xls并将数据放入Excel文件

    我设法使用 VBA 达到准备从网络下载 Excel 文件的程度 但我无法弄清楚如何实际下载该文件并将其内容放入我正在使用的 Excel 文件中 有什么建议么 谢谢 这是到目前为止的代码 Sub GetData Dim IE As Inter
  • Angular 6 服务器端错误:找不到模块:错误:无法解析“./dist/server/main.bundle”

    我正在开发一个项目 将其更新到 Angular 6 更新后 我现在在尝试运行服务器端渲染构建时收到此错误 Module not found Error Can t resolve dist server main bundle 我尝试去ht
  • 在 Clojure 中解压缩 zlib 流

    我有一个二进制文件 其内容由zlib compress在Python上 有没有一种简单的方法可以在Clojure中打开和解压缩它 import zlib import json with open data json zlib wb as
  • Spark 和 Python 使用自定义文件格式/生成器作为 RDD 的输入

    我想问一下 Spark 中输入的可能性 我可以看到从http spark apache org docs latest programming guide html http spark apache org docs latest pro