如何在 PySpark 中读取 Avro 文件

2023-12-31

我正在使用 python 编写 Spark 作业。但是,我需要读取一大堆 avro 文件。

This https://github.com/apache/spark/blob/master/examples/src/main/python/avro_inputformat.py是我在 Spark 示例文件夹中找到的最接近的解决方案。但是,您需要使用spark-submit 提交此python 脚本。在spark-submit的命令行中,您可以指定驱动程序类,在这种情况下,您所有的avrokey,avrovalue类将被定位。

avro_rdd = sc.newAPIHadoopFile(
        path,
        "org.apache.avro.mapreduce.AvroKeyInputFormat",
        "org.apache.avro.mapred.AvroKey",
        "org.apache.hadoop.io.NullWritable",
        keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
        conf=conf)

就我而言,我需要运行Python脚本中的所有内容,我尝试创建一个环境变量来包含jar文件,手指交叉Python会将jar添加到路径中,但显然它不是,它给了我意想不到的类错误。

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"

谁能帮我如何在一个 python 脚本中读取 avro 文件?


火花 >= 2.4.0

您可以使用内置 Avro 支持 https://spark.apache.org/docs/latest/sql-data-sources-avro.html。该 API 向后兼容spark-avro包,添加了一些内容(最值得注意的是from_avro / to_avro功能)。

请注意,该模块未与标准 Spark 二进制文件捆绑在一起,必须使用spark.jars.packages或等效机制。

也可以看看Pyspark 2.4.0,使用读取流从 kafka 读取 avro - Python https://stackoverflow.com/q/54693110/10465355

火花

您可以使用spark-avro https://github.com/databricks/spark-avro图书馆。首先让我们创建一个示例数据集:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter

schema_string ='''{"namespace": "example.avro",
 "type": "record",
 "name": "KeyValue",
 "fields": [
     {"name": "key", "type": "string"},
     {"name": "value",  "type": ["int", "null"]}
 ]
}'''

schema = avro.schema.parse(schema_string)

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
    wrt.append({"key": "foo", "value": -1})
    wrt.append({"key": "bar", "value": 1})

阅读它使用spark-csv就这么简单:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()

## +---+-----+
## |key|value|
## +---+-----+
## |foo|   -1|
## |bar|    1|
## +---+-----+ 
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 PySpark 中读取 Avro 文件 的相关文章

随机推荐

  • 如何在处理多个文件时组织 Vim 缓冲区、窗口和选项卡

    我一生都在使用 VIM 但最近我有点厌倦了它 因为在一个大项目 有 500k LOC 和数百个文件 中同时处理 20 个左右的文件时 我迷失在缓冲区 窗口和选项卡中 每当我这样做 make grep等等 新的缓冲区在当前窗口中跳出 标签也会
  • 使用 JobStoreTX 配置 CronTriggerFactoryBean 以实现quartz集群

    我们使用的是 Quartz 2 1 5 我们设置了以下属性 org quartz jobStore class org quartz impl jdbcjobstore JobStoreTX org quartz jobStore driv
  • 当(当前)只有一个类实现接口时,您是否应该创建一个接口?

    如果有可能有其他东西可以使用它 您是否应该始终创建一个接口 或者等到实际需要它然后重构以使用接口 对接口进行编程通常看起来是合理的建议 但 YAGNI 我想也许这要视情况而定 现在我有一个代表可以包含食谱或其他文件夹的文件夹的对象 我不应该
  • ARM NEON SIMD 版本 2

    Cortex A15 中的 NEON SIMD 和 NEON SIMD 版本 2 有什么区别 它添加了 SIMD FMA 指令 VFMA F32 并且还强制要求 NEON 半精度扩展 ARM Cortex A7 ARM Cortex A15
  • HTTPS nonProxyHosts 的 JVM 参数

    所以我有一个相当加载的环境变量 JAVA OPTIONS export JAVA OPTIONS Dhttp proxyHost my proxy com Dhttp proxyPort 1080 Dhttps proxyHost my p
  • Python Eve:请求的资源上不存在“Access-Control-Allow-Origin”标头

    我使用Python EVE框架编写了一个API 当尝试从 AngularJS 应用程序访问 API 时 它显示错误 如下所示 XMLHttpRequest cannot load http 127 0 0 1 5000 user jay3d
  • 创建未知大小的稀疏矩阵

    例如 我有一个文本文件 其中每一行都指示图形上的一条边 2 5 1 表示节点 2 和 5 之间权重为 1 的边 我想使用这些元组创建一个稀疏邻接矩阵 通常 我会将稀疏矩阵初始化为 G scipy sparse lil matrix n n
  • const char* 的奇怪 std::cout 行为

    我有一个方法返回一个字符串以显示为错误消息 根据程序中发生此错误的位置 我可能会在显示错误消息之前添加更多解释 string errorMessage return this is an error somewhere in the pro
  • 在 Java 面板中包含命令提示符

    我有一个批处理文件 可以从 SVN 中检出代码并对其调用几个命令 这发生在 Windows 命令提示符上 我想从我的 java 程序调用这个批处理文件 并且命令提示符必须出现在我的应用程序窗口的控制台中 而不是作为单独的窗口 这样我就可以从
  • 如何在图像周围添加图像边框?

    有没有简单的方法可以在图像周围添加图像边框 原因是我想在图像周围创建阴影效果 图像作为缩略图加载 大小为 110x75 像素 我正在考虑创建阴影边框 但不知道如何将其添加到图像周围 有人知道方法吗 最好是PHP 您可以使用 GD 库或 Im
  • 我的应用程序中的 ic_launcher 图标错误

    我正在开发一个应用程序Honeycomb并遇到了这个非常奇怪的问题 我更改了应用程序图标 ic launcher 在每一个drawable文件夹并确保它在清单中正确 但我有一个标准 settings 启动器中的图标 在应用程序本身中是正确的
  • SVG 圆中 dasharray 属性的奇怪行为

    我正在尝试创建 SVG 圆的无限动画循环 我想创建 12 个相等的块 并将它们分开一些间隙 为了计算我使用的圆片的价值k系数见下表 所以我做了 0 25782 160 我的圆的直径 我得到 41 2512 它应该是我的棋子的值 之后我创建了
  • 获取正在运行的进程的维度

    我正在尝试抓取应用程序中特定 x y 位置的屏幕截图 有没有办法在 Process 对象中获取正在运行的应用程序 然后获取它的尺寸 就像是 Process processlist Process GetProcesses foreach P
  • 验证错误:值无效

    我的 p selectOneMenu 有问题 无论我做什么 我都无法让 JSF 调用 JPA 实体上的 setter JSF 验证失败并显示以下消息 形式 位置 验证错误 值无效 我在同一类型的其他几个类 即连接表类 上进行了此工作 但我一
  • 无法使用 Espresso 将文本添加到 webview 文本字段

    我正在尝试将文本添加到 Esprsso 中的文本字段 在 Web 视图内 但收到此错误 引起原因 java lang RuntimeException 评估错误评估 状态 13 值 message 无法设置选择结束 hasMessage 真
  • Java 中的动态绑定==后期绑定吗?

    在不同的来源中 我读到了有关该主题的不同内容 例如维基百科说 后期绑定经常与动态调度混淆 但两者之间存在显着差异 但几行之后 在 Java 编程中 流行使用术语 后期绑定 作为动态分派的同义词 具体来说 这是指与虚拟方法一起使用的 Java
  • 部分选择排序与合并排序查找“数组中最大的 k”

    我想知道我的思路是否正确 我正在准备面试 作为一名大学生 我遇到的问题之一是找到数组中最大的 K 个数字 我的第一个想法是只使用部分选择排序 例如 从第一个元素扫描数组 并为看到的最低元素及其索引保留两个变量 并与数组末尾的该索引交换 并继
  • 如何批量加载从其他来源生成的自定义 Avro 数据?

    Cloud Spanner 文档说 Spanner 可以导出 导入 Avro 格式 此路径是否也可用于批量摄取从其他来源生成的 Avro 数据 该文档似乎表明它只能导入同样由 Spanner 生成的 Avro 数据 我运行了一个快速导出作业
  • 当 MPMovieControlStyle = MPMovieControlStyleNone 时如何触摸/单击 MPMoviePlayerController 视图

    在我的一个应用程序中 我不想显示任何视频控制器 但我需要接触媒体播放器视图 我需要在触摸电影播放器 时执行一些其他操作 我怎样才能实现它 请帮忙 提前致谢 您可以随时附上UITapGestureRecognizer查看并处理水龙头 UITa
  • 如何在 PySpark 中读取 Avro 文件

    我正在使用 python 编写 Spark 作业 但是 我需要读取一大堆 avro 文件 This https github com apache spark blob master examples src main python avr