如何控制从 Spark DataFrame 写入的输出文件的数量?

2023-12-10

使用 Spark Streaming 从 Kafka 主题读取 Json 数据。
我使用 DataFrame 来处理数据,稍后我希望将输出保存到 HDFS 文件。问题是使用:

df.write.save("append").format("text")

产生许多文件,有些文件很大,有些甚至是 0 字节。

有没有办法控制输出文件的数量?另外,为了避免“相反”的问题,是否有办法限制每个文件的大小,以便当当前达到一定大小/行数时将写入新文件?


输出文件的数量等于分区的数量Dataset这意味着您可以根据上下文通过多种方式控制它:

  • For Datasets没有广泛的依赖性,您可以使用阅读器特定参数控制输入
  • For Datasets具有广泛的依赖性,您可以控制分区的数量spark.sql.shuffle.partitions范围。
  • 与血统无关,你可以coalesce or repartition.

有没有办法也限制每个文件的大小,以便当当前达到一定大小/行数时将写入新文件?

不会。对于内置编写器来说,这是严格的 1:1 关系。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何控制从 Spark DataFrame 写入的输出文件的数量? 的相关文章

  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • Scala 集合不一致

    为什么 Scala Collections API 中的集合和列表之间缺乏一致性 例如 有不可变的 Set 但也有可变的 Set 如果我想使用后者 我可以简单地这样做 val set Set A set new A 但是 本身不存在可变列表
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • Apache Kafka 中消费者消费消息的延迟

    我正在使用 Kafka 0 8 0 并尝试实现下面提到的场景 JCA API 充当生产者并将数据发送到 gt 消费者 gt HBase 一旦我使用 JCA 客户端获取数据 我就会将每条消息发送给消费者 例如 一旦生产者发送消息 no 1 我
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 使用 Akka 玩 2.5 - 找不到参数超时的隐式值:akka.util.Timeout

    我正在尝试使用 Play 2 5 测试 Akka 但遇到了一个似乎无法解决的编译错误 我正在关注 Play 文档中的此页面 https playframework com documentation 2 5 x ScalaAkka http
  • 是否有用于事件驱动的 Kafka 消费者的 Python API?

    我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序 因此 我希望有一个 Kafka 消费者 当相关主题的流中存在新消息时 该消费者会被触发 并通过将消息推回到 Kafka 流来进行响应 我一直在寻找类似 Spring
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 使用 Spray-json 解析简单数组

    我正在尝试 但失败了 了解 Spray json 如何将 json feed 转换为对象 如果我有一个简单的 key gt value json feed 那么它似乎可以正常工作 但是我想要读取的数据出现在如下列表中 name John a
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • 使用 Spark DataFrame 获取组后所有组的 TopN

    我有一个 Spark SQL DataFrame user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4 如何按用户分组然后返回TopN
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 为什么 Spark 没有使用本地计算机上的所有核心

    当我在 Spark Shell 中或作为作业运行一些 Apache Spark 示例时 我无法在单台计算机上实现完全的核心利用率 例如 var textColumn sc textFile home someuser largefile t
  • 使用多行选项和编码选项读取 CSV

    在 azure Databricks 中 当我使用以下命令读取 CSV 文件时multiline true and encoding SJIS 似乎编码选项被忽略了 如果我使用multiline选项 Spark 使用默认值encoding那
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过
  • Scala 和变量中的模式匹配

    我是 Scala 新手 有点想知道模式匹配是如何工作的 想象一下我有以下内容 case class Cls i Int case b Cls i gt Ok case e Cls gt Ok case f Cls gt Ok case s
  • Scala Tuple2Zipped 与 IterableLike zip

    两种实现有什么区别 这个比那个好吗 有一篇博客文章说 Tuple2Zipped 性能更好 但没有提供原因 并且查看源代码我没有看到差异 val l1 List 1 2 3 val l2 List 5 6 7 val v1 l1 zip l2
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 缓存 Slick DBIO 操作

    我正在尝试加快 SELECT FROM WHERE name 的速度Play 中的查询类型 Scala 应用程序 我正在使用 Play 2 4 Scala 2 11 play slick 1 1 1 包 该软件包使用Slick 3 1版本
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键

随机推荐

  • 如何将 Bitmap 对象从一个活动传递到另一个活动

    在我的活动中 我创建了一个Bitmap对象 然后我需要启动另一个对象Activity 我怎样才能通过这个Bitmap来自子活动 即将启动的活动 的对象 Bitmap实施Parcelable 所以你总是可以带着意图传递它 Intent int
  • 如何在 sns clustermap 中标记集群

    我正在使用以下代码创建聚类图 import numpy as np import pandas as pd import seaborn as sns all net names early vis face motion scene sc
  • 使用 xlwt for excel 在 python 中预格式化为货币和两位小数

    我有一个列标题Fee Using xlwt in python 我成功生成了所需的Excel 创建Excel文件时此列始终为空 是否有可能拥有Fee列预格式化为 货币 和 两位小数 这样当我在Fee下载后Excel文件的列 23应该变成 2
  • 非托管资源和 Dispose()

    我正在读一些关于Dispose 方法并发现非托管资源应该显式释放Dispose 方法 或 Finalize 方法 文章称文件句柄和数据库连接对象是非托管资源的示例 谁能解释为什么这些是非托管的以及如果在 Dispose 中处理不当会发生什么
  • 使用 UIActivityViewController 共享视频时,视频未附加到邮件中

    我正在使用下面的代码来共享位于设备上的视频 它非常适合通过消息 facebook 和 iCloud 共享 但不适用于邮件 我可以看到邮件选项在那里 但在邮件草稿中 视频不存在随附的 在代码中 videoAsset is a PHAsset类
  • Delphi 字节逆序

    我一直在尝试编写一个函数 它接受两个指针 一个输入和一个输出 并以相反的顺序将输入中的字节写入输出 到目前为止我还没能让它正常工作 procedure ReverseBytes Source Dest Pointer Size Intege
  • 请求已被阻止;内容必须通过 HTTPS 提供

    我正在后端使用 Spring Security 和 Spring MVC 以及前端使用 Angular 进行应用程序 我的问题是我正确登录 但注销时的问题我在本地主机中正确实现 http localhost 8080工作没有问题 当我将其更
  • Azure Function 在应用程序服务计划中调用自身两次

    我的 azure 函数中有以下代码 手动超时为 10 分钟 using System Net public static async Task
  • 在阻塞 webRequest 处理程序中使用异步调用

    Summary 我正在使用一个browser webRequest onBeforeRequest处理程序 我需要阻止 webRequest 直到从处理程序中调用异步方法返回信息为止 我怎样才能做到这一点 细节 首先 我对这个长问题表示歉意
  • 如何编写一段java代码,让一个实例只发生一次?

    我正在写一个应用程序 在该应用程序中 我有一个实例 我只想在第一次打开应用程序时发生 我想知道如何用 Java 编写一些东西 只让该实例第一次发生 比如注册码之类的 持久存储 创建一个单例 Blackberry 开发论坛有一个出色的单例方法
  • C# 可排序集合,允许重复键

    我正在编写一个程序来设置各种对象在报告中出现的顺序 该序列是 Excel 电子表格上的 Y 位置 单元格 代码的演示部分如下 我想要完成的是拥有一个集合 这将允许我添加多个对象 并且我可以根据顺序获得排序的集合 SortedList lis
  • AJAX 从文件中读取

    我正在使用 AJAX 读取文本文件 如何只读取第一行 此代码应该可以帮助您从远程文本文件中读取 var txtFile new XMLHttpRequest txtFile open GET http my remote url myrem
  • 如何在使用 Ruby on Rails 3 的播种过程中避免验证、回调和“attr_accessible”效应?

    我正在使用 Ruby on Rails 3 并尝试在我的应用程序数据库中播种数据 在 RAILS ROOT models user rb 中 我有 class User lt ActiveRecord Base attr accessibl
  • 在启用 https 的情况下运行 gwt?

    我当前正在运行一个没有安全性的 GWT 站点 但需要切换到 HTTPS 工作的非安全版本使用以下参数运行 port 8888 startupUrl ui index jsp com example EntryPoint 我读过 简单地添加
  • 串口写行文本框错误

    我使用虚拟 COM 端口来测试我的程序 我想用 COM8 进行串行写入 用 COM9 进行串行读取 当想要写入 textbox1 中的值时 我收到此错误 IOException was unhandled The parameter is
  • XslLoadException:禁止解析外部 URI

    我有 xslt 工作表 其中包含另一个 xslt 文件的标签 所有文件编译正确且无错误 但运行以下代码时出现异常 var myXslTrans new XslCompiledTransform XsltSettings sets new X
  • 为什么我无法在 ubuntu 上运行基于 alpine 构建的 C 程序? [复制]

    这个问题在这里已经有答案了 我在 alpine Linux 容器中编译了一个简单的 hello world C 程序 并将其复制到我的 ubuntu 主机上 令我惊讶的是 我无法在我的 ubuntu 主机上运行二进制文件 相反 当我尝试执行
  • python.dataScience 是 VS Code 中的“未知配置设置”

    我正在 MacOS Catalina 上运行带有扩展 Jupyter Notebook 2020 12 和 Python 2020 12 的 VS Code 版本 1 52 Context 我无法让 Intellisense 在 VS Co
  • PHP/MySQL 编码问题。 â�� 代替某些字符

    我在使用 php 将某些字符输入到 mysql 数据库时遇到了一些问题 我正在做的是将用户输入的文本提交到数据库 我无法弄清楚我需要更改什么才能允许将任何类型的字符放入数据库并通过 php 打印出来 就像它想象的那样 我的 MySQL 排序
  • 如何控制从 Spark DataFrame 写入的输出文件的数量?

    使用 Spark Streaming 从 Kafka 主题读取 Json 数据 我使用 DataFrame 来处理数据 稍后我希望将输出保存到 HDFS 文件 问题是使用 df write save append format text 产