Pyspark:将 tar.gz 文件加载到数据框中并按文件名过滤

2023-12-02

我有一个包含多个文件的 tar.gz 文件。层次结构如下所示。我的目的是读取tar.gz文件,过滤掉其中的内容b.tsv因为它是静态元数据,其中所有其他文件都是实际记录。

gzfile.tar.gz
|- a.tsv
|- b.tsv
|- thousand more files.

通过 pyspark load,我可以将文件加载到数据帧中。我使用了命令:

spark = SparkSession.\
        builder.\
        appName("Loading Gzip Files").\
        getOrCreate()
input = spark.read.load('/Users/jeevs/git/data/gzfile.tar.gz',\
          format='com.databricks.spark.csv',\
          sep = '\t'

为了过滤,我添加了文件名

from  pyspark.sql.functions import input_file_name
input.withColumn("filename", input_file_name())

现在生成的数据如下:

|_c0 |_c1 |filename |
|b.tsv0000666000076500001440035235677713575350214013124 0ustar  netsaintusers1|Lynx 2.7.1|file:///Users/jeevs/git/data/gzfile.tar.gz|
|2|Lynx 2.7|file:///Users/jeevs/git/data/gzfile.tar.gz|

当然,文件字段填充有 tar.gz 文件,使得该方法毫无用处。 更令人恼火的问题是,_c0 正在填充filename+garbage+first row values

此时,我想知道读取的文件本身是否变得奇怪,因为它是 tar.gz 文件。当我们执行此处理的 v1 时(spark 0.9),我们还有另一个步骤,将数据从 s3 加载到 ec2 框中,提取并写回 s3。我正在努力摆脱这些步骤。

提前致谢!


数据块不支持直接*.tar.gz迭代。为了处理文件,必须将它们解压缩到临时位置。数据块支持bash比能完成这项工作。

%sh find $source -name *.tar.gz -exec tar -xvzf {} -C $destination \;

上面的代码将解压所有带有扩展名的文件*.tar.gz在源位置到目标位置。 如果路径是通过dbutils.widgets或静态中%scala or %pyspark,路径必须声明为环境变量。 这可以实现在%pyspark

import os
os.environ[' source '] = '/dbfs/mnt/dl/raw/source/'

使用以下方法加载文件,假设内容在*.csv file:

DF = spark.read.format('csv').options(header='true', inferSchema='true').option("mode","DROPMALFORMED").load('/mnt/dl/raw/source/sample.csv')
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark:将 tar.gz 文件加载到数据框中并按文件名过滤 的相关文章

  • 分割 tar.bz2 文件并单独提取每个文件

    我可以将一个大的 tar bz2 文件分割成几个较小的文件并在 Ubuntu 中单独提取这些小 tar bz2 文件吗 Thanks 我认为这不容易实现 A tar bz2是单个流 它没有像这样的索引zip这将允许跳到存档中特定文件的开头
  • 从 Spark 数据帧中过滤大量 ID

    我有一个大型数据框 其格式类似于 ID Cat date 12 A 201602 14 B 201601 19 A 201608 12 F 201605 11 G 201603 我需要根据大约 500 万个 Is 的列表来过滤行 最直接的方
  • Spark 上的 Kubernetes 驱动程序 pod 清理

    我在 kubernetes 1 19 上运行 Spark 3 1 1 作业完成后 执行程序 Pod 就会被清理 但驱动程序 Pod 仍处于完成状态 驱动程序完成后如何清理 要设置任何配置选项吗 NAME READY STATUS RESTA
  • Spark Collect_list 并限制结果列表

    我有以下格式的数据框 name merged key1 internalKey1 value1 key1 internalKey2 value2 key2 internalKey3 value3 我想做的是将数据框分组name 收集列表并l
  • 在 Apache Spark 上下文中,内存数据存储意味着什么?

    我读到 Apache Spark 将数据存储在内存中 然而 Apache Spark 旨在分析大量数据 又称大数据分析 在这种情况下 内存数据存储的真正含义是什么 它可以存储的数据是否受到可用 RAM 的限制 它的数据存储与使用HDFS的A
  • Sparklyr - 在 Apache Spark Join 中包含空值

    问题在 Apache Spark Join 中包含空值 https stackoverflow com questions 41728762 including null values in an apache spark join有 Sc
  • Spark-获取RDD中的文件名

    我正在尝试处理每天都在增长的 4 个文本文件目录 我需要做的是 如果有人试图搜索发票号码 我应该给他们包含该发票号码的文件列表 我能够通过将文本文件加载为 RDD 来映射和减少文本文件中的值 但是如何获取文件名和其他文件属性呢 从 Spar
  • Spark 和 Ipython 中将非数字特征编码为数字的问题

    我正在做一些我必须做出预测的事情numeric数据 每月员工支出 使用non numeric特征 我在用Spark MLlibs Random Forests algorthim 我有我的features数据在一个dataframe看起来像
  • Pyspark显示最大值(S)和多重排序

    感谢这里的一些帮助 使用Pyspark 请不能使用SQL 所以我有一个存储为 RDD 对的元组列表 城市1 2020 03 27 X1 44 城市1 2020 03 28 X1 44 City3 2020 03 28 X3 15 City4
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • 如何抑制spark输出控制台中的“Stage 2===>”?

    我有数据帧并试图获取不同的计数并且能够成功获取不同的计数 但是每当 scala 程序执行时我都会收到此消息 Stage 2 gt 1 1 2 我如何在控制台中抑制特定的此消息 val countID dataDF select substr
  • 总分配超过堆内存的 95.00%(960,285,889 字节)- pyspark 错误

    我用 python 2 7 编写了一个脚本 使用 pyspark 将 csv 转换为 parquet 和其他内容 当我在小数据上运行脚本时 它运行良好 但是当我在更大的数据 250GB 上运行脚本时 我遇到了以下错误 总分配超过堆内存的 9
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • Spark 结构化流中具有不同计数的聚合抛出错误

    我正在尝试在 Spark 结构化流中获取 Parentgroup childgroup 和 MountingType 组的唯一 id 代码 下面的代码抛出错误 withWatermark timestamp 1 minutes val ag
  • 为什么 Spark 没有使用本地计算机上的所有核心

    当我在 Spark Shell 中或作为作业运行一些 Apache Spark 示例时 我无法在单台计算机上实现完全的核心利用率 例如 var textColumn sc textFile home someuser largefile t
  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • Spark:Shuffle Write、Shuffle 溢出(内存)、Shuffle 溢出(磁盘)之间的区别?

    我有以下 Spark 工作 试图将所有内容保留在内存中 val myOutRDD myInRDD flatMap fp gt val tuple2List ListBuffer String myClass ListBuffer tuple
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF

随机推荐