如何将 Spark DataFrame 以 csv 格式保存在磁盘上?

2024-05-21

例如,这样的结果:

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

将返回一个数组。

如何将 Spark DataFrame 作为 csv 文件保存在磁盘上?


Apache Spark 不支持磁盘上的本机 CSV 输出。

不过,您有四种可用的解决方案:

  1. 您可以将 Dataframe 转换为 RDD :

    def convertToReadableString(r : Row) = ???
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath)
    

    这将创建一个文件夹文件路径。在文件路径下,您将找到分区文件(例如part-000*)

    如果我想将所有分区附加到一个大的 CSV 中,我通常会做的是

    cat filePath/part* > mycsvfile.csv
    

    有的会用coalesce(1,false)从 RDD 创建一个分区。它通常是一个不好的做法,因为它可能会将您收集的所有数据拉到驱动程序中,从而使驱动程序不堪重负。

    注意df.rdd将返回一个RDD[Row].

  2. With 火花,您可以使用databricks Spark-csvlibrary https://github.com/databricks/spark-csv:

    • 火花1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath)
      
    • 火花1.3:

      df.save(filepath,"com.databricks.spark.csv")
      
  3. With 火花2.x the spark-csv不需要包,因为它包含在 Spark 中。

    df.write.format("csv").save(filepath)
    
  4. 您可以转换为本地 Pandas 数据框并使用to_csv方法(仅限 PySpark)。

Note:解决方案 1、2 和 3 将生成 CSV 格式文件(part-*)由 Spark 在调用时调用的底层 Hadoop API 生成save。你将会拥有一个part-每个分区的文件。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 Spark DataFrame 以 csv 格式保存在磁盘上? 的相关文章

  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala
  • Spark 结构化流中具有不同计数的聚合抛出错误

    我正在尝试在 Spark 结构化流中获取 Parentgroup childgroup 和 MountingType 组的唯一 id 代码 下面的代码抛出错误 withWatermark timestamp 1 minutes val ag
  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • Scala 和变量中的模式匹配

    我是 Scala 新手 有点想知道模式匹配是如何工作的 想象一下我有以下内容 case class Cls i Int case b Cls i gt Ok case e Cls gt Ok case f Cls gt Ok case s
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • 缓存 Slick DBIO 操作

    我正在尝试加快 SELECT FROM WHERE name 的速度Play 中的查询类型 Scala 应用程序 我正在使用 Play 2 4 Scala 2 11 play slick 1 1 1 包 该软件包使用Slick 3 1版本
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将
  • 懒惰背景下的变革与行动

    正如 Learning Spark 闪电般快速的大数据分析 一书中提到的 由于 Spark 计算 RDD 的方式不同 转换和操作也有所不同 在对惰性进行一些解释之后 我发现转换和操作都是惰性地进行的 那么问题来了 这句话的意思是什么 对比
  • 玩:将表单字段绑定到双精度型?

    也许我只是忽略了一些明显的事情 但我无法弄清楚如何将表单字段绑定到 Play 控制器中的双精度型 例如 假设这是我的模型 case class SavingsGoal timeframeInMonths Option Int amount
  • Spark KMeans 无法处理大数据吗?

    KMeans 有几个参数training http spark apache org docs latest api python pyspark mllib html highlight kmeans pyspark mllib clus
  • 如何为 Spark RDD 中的元素分配唯一的连续编号

    我有一个数据集 user product review 并希望将其输入到 mllib 的 ALS 算法中 该算法需要用户和产品是数字 而我的是字符串用户名和字符串SKU 现在 我获取不同的用户和 SKU 然后在 Spark 外部为它们分配数
  • 为什么在 Scala 中函数类型需要以单独的参数组传递到函数中

    我是 scala 新手 我用两种方式编写了相同的代码 但我对两种方式有点困惑 在第二种方式中 f 的参数类型是自动派生的 但在 type1 中 scala 编译器无法执行相同的操作 我只是想了解这背后的想法是什么 Type1 给出编译错误
  • Akka 2 中的调度程序有哪些差异和使用模式?

    我很难理解它们的差异和推荐用法Akka 2 中的调度程序 http doc akka io docs akka current scala dispatchers html 我想我明白了平衡调度程序 http doc akka io api
  • Spark 执行器登录 YARN

    我正在 Cloudera 集群上以 YARN 客户端模式启动分布式 Spark 应用程序 一段时间后 我在 Cloudera Manager 上看到一些错误 一些执行者会断开连接 并且这种情况会系统性地发生 我想调试该问题 但 YARN 未
  • 如何使用 log4j 自定义附加程序在 HDFS 上创建日志?

    Overview 我们希望使用 log4j 记录 Spark 作业活动 并将日志文件写入 HDFS Java 8 Spark 2 4 6 Scala 2 1 2 Hadoop 3 2 1 我们无法找到本地 apache log4j 附加程序
  • Spark EC2 SSH连接错误SSH返回代码255

    每次我尝试通过 Spark ec2 spark ec2 py 文件在 AWS 上启动 Spark 集群时 都会收到 SSH 连接错误 最终解决了 但是浪费了很多时间 在您将其标记为重复之前 我知道有很多类似的问题被问到 但有两个关键区别 a
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 如何访问 Scala XML 中的父元素

    The scala xml包表示带有标记树节点的 XML 但是这棵树在 Scala 2 7 中是单向的吗 因为似乎没有办法访问Elem给定的父级Elem 这似乎同样适用于父母Document 例如 在 XOM 中你有getParent an
  • 必须包含 log4J,但它会导致 Apache Spark shell 中出现错误。如何避免错误?

    由于我必须将 jar 包含到 Spark 代码中 因此我想请求帮助找出解决此问题而不删除 log4j 导入的方法 简单代码如下 cp symjar log4j 1 2 17 jar import org apache spark rdd v

随机推荐

  • 多个容器 POD 中的一个容器进程崩溃会发生什么情况?

    通常在单容器POD中 当容器的主进程崩溃时 Pod会重新启动 如果有多个容器 POD 如果第二个容器中的一个进程崩溃 会发生什么情况 POD 会重新启动吗 来自文档here https kubernetes io docs concepts
  • Rails 4 单选按钮表单助手,true 不验证

    我在 needs dist 上附加了简单的是或否单选按钮 当我提交表单时选择 否 它工作得很好 但是当我选择 是 时 它会抛出验证错误吗 它仅在 needs dist gt true 时有效 Model validates presence
  • 如何读取大型平面文件

    我有一个平面文件 其中包含 339276 行文本 大小为 62 1 MB 我试图读入所有行 根据我所拥有的某些条件解析它们 然后将它们插入数据库 我最初尝试使用 bufio Scan 循环和 bufio Text 来获取该行 但缓冲区空间不
  • cosmosdb 模拟器没有给出任何结果

    我不知道为什么在查询宇宙数据库时会发生这种情况 它不会显示任何文档 即使是 SELECT FROM c 但显示了 RU 但它与文档选项卡中的文档选项卡配合得很好 如果我使用任何过滤器 那么它也可以工作 但它不适用于 SQL 查询 我已经添加
  • 我如何知道 C 程序的可执行文件是在前台还是后台运行?

    在我的 C 程序中 我想知道我的可执行文件是否像这样在前台运行 a out 或者像这样 a out 如果你是前台工作 getpgrp tcgetpgrp STDOUT FILENO or STDIN FILENO or STDERR FIL
  • 验证属性被触发两次

    在我的 MVC3 应用程序中 我有模型 未删除重要属性 public class AccountViewModel StringLength 65 public string Property1 get set StringLength 6
  • 锁定 ASP.NET 应用程序变量

    我在 ASP NET 应用程序中使用第三方 Web 服务 对第 3 方 Web 服务的调用必须同步 但 ASP NET 显然是多线程的 并且可能会发出多个页面请求 从而导致对第 3 方 Web 服务的同时调用 对 Web 服务的调用封装在自
  • 如何更改对话框片段内的片段

    我想做一个空的DialogFragment with a LinearLayout然后更改里面的片段LinearLayout 例如 第一个片段是 3 个按钮 facebook google 电子邮件登录 的登录 当有人按下电子邮件时 第 2
  • 除括号之间的内容外,所有内容均小写

    考虑以下字符串 LoReM FOO IPSUM dolor BAR Samet fooBar 我正在寻找一种方法来小写所有内容 除了 brackets 之间的内容应该被忽略 所以期望的输出是 lorem FOO ipsum dolor BA
  • 角度 - 传递管道作为变量

    如何存储和使用变量中的管道信息 我已经搜索了很多 但找不到解决方案 我想要实现的是将任何有效的管道信息作为变量 小数 百分比 日期 自定义等 传递 下面是一个简单的例子 父组件 ts columnsDef value 0 35 pipeIn
  • 响应式网格布局框架[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 在没有模型的情况下将自定义页面添加到 django admin

    我正在尝试在没有模型关联的情况下向管理员添加自定义页面 这就是我迄今为止所取得的成就 class MyCustomAdmin AdminSite def get urls self from django conf urls import
  • 每次 UIScrollView 释放时都会发生内存泄漏

    在我的应用程序中 我有一个滚动视图和四个表格视图 每次拖动然后释放时 我都会泄漏 48 字节 这确实很重要 正如您所看到的 两组泄漏都有相同的来源 有人见过这样的泄漏吗 Edit 1 当我单击泄漏旁边的箭头时 我会得到泄漏的以下信息 您所看
  • 查找其索引的乘积可被另一个数字 X 整除的对的数​​量

    给定一个数组和某个值 X 找到满足以下条件的对的数量 i lt j a i a j and i j X 0 Array size lt 10 5 我想这个问题有一段时间了 但只能想出蛮力解决方案 通过检查所有对 这显然会超时 O N 2 t
  • 使用 Objective C 将 RGB 彩色图像更改为灰度图像

    我正在开发一个将彩色图像更改为灰度图像的应用程序 然而 有些图片显示出来是错误的 我不知道代码有什么问题 也许我输入的参数有误 请帮忙 UIImage c UIImage imageNamed downRed png CGImageRef
  • 如何检查主音量是否静音

    如何在 Windows 7 操作系统中检查主音量是否静音我有静音或取消静音的代码 IE Public Const APPCOMMAND VOLUME MUTE As Integer H80000 Public Const APPCOMMAN
  • rake cucumber 和 rake spec 始终使用“开发”环境

    我运行 Cucumber 和 RSpec 测试的 rake 任务始终使用我的development环境 以下是相关的配置文件 RAILS ROOT config environments cucumber rb Edit at your o
  • C free() 是如何工作的? [复制]

    这个问题在这里已经有答案了 可能的重复 malloc 和 free 如何工作 https stackoverflow com questions 1119134 how malloc and free work include
  • 寻求有关共享内存锁定问题的文章

    我正在审查一些代码并对所使用的技术感到怀疑 在Linux环境中 有两个进程附加多个 共享内存段 第一个进程定期加载新的集合 要共享的文件 并将共享内存ID shmid 写入 主 共享内存段中的一个位置 第二道工序 不断读取这个 主 位置并使
  • 如何将 Spark DataFrame 以 csv 格式保存在磁盘上?

    例如 这样的结果 df filter project en select title count groupBy title sum 将返回一个数组 如何将 Spark DataFrame 作为 csv 文件保存在磁盘上 Apache Sp