可以触发流选择特定文件

2024-03-25

我的程序连续读取流hadoop文件夹(比如/hadoopPath/)。它从上面的文件夹中选取所有文件。我可以只显示该文件夹的特定文件类型吗(例如:/hadoopPath/*.log)

我还有一个与 Spark 和流媒体相关的问题:Spark Streaming 是否适用于“cp”和“mv” https://stackoverflow.com/questions/36350336/is-spark-streaming-works-with-both-cp-and-mv


我已经为同样的问题苦苦挣扎了几个小时,虽然看起来很简单,但我在网上找不到任何关于它的信息。最后,我找到了一个适合我的案例的解决方案。我将其放在这里是为了为遇到同样问题的其他人节省一些时间。
假设您只想读取具有“path-to-hadoop-folder/*.csv”模式的文件。在默认情况下,当您指定文件夹时,spark 会读取该文件夹中的所有文件(例如 .csv)。COPYING)就我而言,这导致了错误。您所需要做的就是在定义 readStrem 时在 .csv 方法中指定此模式。 python 中的一个例子如下:

activity = spark \
    .readStream \ 
    .option("sep", ",") \ 
    .schema(userSchema) \ 
    .csv("path-to-hadoop-folder/*.csv")  

这样,spark 仅考虑具有 *.csv 模式的文件,并忽略该文件夹中的所有其他文件。我在spark 2.0.0和hadoop 2.6上测试过。 (P.S 我只测试了 csv 文件,但我想处理文本文件应该有类似的解决方案) 你可以在中找到相同的解决方案Spark DataStreamReader 公会 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

可以触发流选择特定文件 的相关文章

  • S3A:失败,而 S3:在 Spark EMR 中工作

    我将 EMR 5 5 0 与 Spark 结合使用 如果我使用一个简单的文件写入 s3s3 网址写得很好 但如果我使用s3a 地址 它失败了Service Amazon S3 Status Code 403 Error Code Acces
  • scalac 编译生成“对象 apache 不是包 org 的成员”

    我的代码是 import org apache spark SparkContext 它可以在交互模式下运行 但是当我使用 scalac 编译它时 出现以下错误消息 对象 apache 不是包 org 的成员 这似乎是路径的问题 但我不知道
  • ETL informatica 大数据版(非云版)可以连接到 Cloudera Impala 吗?

    我们正在尝试在 Informatica 大数据版本 不是云版本 上进行概念验证 我发现我们可能能够使用 HDFS Hive 作为源和目标 但我的问题是 Informatica 是否连接到 Cloudera Impala 如果是这样 我们是否
  • 指定 Parquet 属性 pyspark

    如何在 PySpark 中指定 Parquet 块大小和页面大小 我到处搜索 但找不到任何有关函数调用或导入库的文档 根据火花用户档案 https mail archives apache org mod mbox spark user 2
  • 以不同用户身份运行 MapReduce 作业

    我有一个与 Hadoop 交互的 Web 应用程序 Cloudera cdh3u6 特定的用户操作应在集群中启动新的 MapReduce 作业 该集群不是一个安全集群 但它使用简单的组身份验证 因此 如果我以自己的身份通过 ssh 连接到它
  • 从 Spark 数据帧中过滤大量 ID

    我有一个大型数据框 其格式类似于 ID Cat date 12 A 201602 14 B 201601 19 A 201608 12 F 201605 11 G 201603 我需要根据大约 500 万个 Is 的列表来过滤行 最直接的方
  • 在 Scala 中创建 Java 对象

    我有一个 Java 类 Listings 我在 Java MapReduce 作业中使用它 如下所示 public void map Object key Text value Context context throws IOExcept
  • 在 pyspark 中创建一个包含单列元组的数据框

    我有一个 RDD 其中包含以下内容 column 1 value column 2 value column 3 value column 100 value 我想创建一个包含带有元组的单列的数据框 我得到的最接近的是 schema Str
  • java.lang.NoSuchMethodError:com.fasterxml.jackson.databind.type。使用 apache beam Spark runner 运行 go 示例时

    我想跑grades https github com apache beam tree master sdks go examples gradesapache beam go sdk 提出的示例 在一个主服务器和两个从服务器 spark2
  • takeOrdered 降序 Pyspark

    我想按值对 K V 对进行排序 然后取最大的五个值 我设法用第一个地图恢复 K V 用 FALSE 按降序排序 然后将 key value 反转到原始 第二个地图 然后取前 5 个最大的值 代码是这样的 RDD map lambda x x
  • sqoop 通过 oozie 导出失败

    我正在尝试将数据导出到mysql from hdfs通过sqoop 我可以通过 shell 运行 sqoop 并且它工作正常 但是当我通过调用oozie 它出现以下错误并失败 我还包括了罐子 没有描述性日志 sqoop脚本 export c
  • Spark SQL 广播提示中间表

    我在使用广播提示时遇到问题 可能是缺乏 SQL 知识 我有一个查询 例如 SELECT broadcast a FROM a INNER JOIN b ON INNER JOIN c on 我想要做 SELECT broadcast a F
  • Spark 和 Ipython 中将非数字特征编码为数字的问题

    我正在做一些我必须做出预测的事情numeric数据 每月员工支出 使用non numeric特征 我在用Spark MLlibs Random Forests algorthim 我有我的features数据在一个dataframe看起来像
  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • Hadoop - 直接从 Mapper 写入 HBase

    我有一个 hadoop 作业 其输出应写入 HBase 我并不真正需要减速器 我想要插入的行类型是在映射器中确定的 如何使用 TableOutputFormat 来实现此目的 从所有示例中 我看到的假设是 reducer 是创建 Put 的
  • Spark SQL如何读取压缩的csv文件?

    我尝试过使用apispark read csv读取带有扩展名的压缩 csv 文件bz or gzip 有效 但在源代码中我没有找到任何可以声明的选项参数codec type 即使在这个link https github com databr
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过

随机推荐

  • 无法使用 Gradle 运行 TestNG

    我有一个使用 TestNG 运行的简单代码 但我无法使用 Gradle 运行相同的代码 因为它说找不到 main 方法 这并不奇怪 因为我使用的是注释 但在这样的场景下 如果我必须使用Gradle 如何运行代码 请注意 我对 Gradle
  • 如何根据 Django 中的表单输入向用户显示生成的图像?

    我目前正在使用对象的属性通过 matplotlib 生成图像 并且能够创建一个在 HttpResponse 中显示所述图像的视图 我使用以下代码片段来执行此操作 http wiki scipy org Cookbook Matplotlib
  • Pandas - KeyError:列不在索引中[关闭]

    Closed 这个问题需要调试细节 help minimal reproducible example 目前不接受答案 import pandas as pd import quandl df quandl get WIKI GOOGL p
  • 如何将 MKMapView 用户位置蓝点更改为所选图像?

    是否可以更改blue dot https i stack imgur com ELWID jpg这表明用户的位置MKMapView到图像 例如一辆小汽车或任何 png image In the 查看注释 的方法MKMapViewDelega
  • 循环引用时 .NET 单元测试中的 StackOverflow

    当我注意到以下情况时 我正在测试其他循环参考电阻 public class Foo private Bar myBar new Bar public class Bar private Foo myFoo new Foo Fact publ
  • “pointer-events: none”在 IE9 和 IE10 中不起作用

    CSS 属性pointer events none 在 Firefox 中工作正常 但在 Internet Explorer 9 10 中则不然 有没有办法在 IE 中实现该属性的相同行为 有任何想法吗 来自 MDN 文档 警告 在 CSS
  • React:检查器不是一个函数

    我在 React 应用程序的控制台中收到这条奇怪的警告消息 警告 Failed propType checker is not a function 检查渲染方法Chart 我根本没有任何检查方法 如果我删除我的propTypes 警告消失
  • 姜戈 1.5。 'url' 需要一个非空的第一个参数。 Django 1.5 中的语法发生了变化

    如果我尝试 href url post content product id p id 我有这个错误 url 需要一个非空的第一个参数 语法发生了变化 Django 1 5 请参阅文档 如何改变呢 Django 1 5 中的更改 第一个参数
  • Unity中协程的异常处理

    我创建了一个脚本来更改它所附加的游戏对象的透明度 并且我在需要可取消的淡入淡出协程中进行透明度更改 并且每次我们调用时都取消 ChangeTransparency 具有新的值 我设法让它按照我想要的方式工作 但我想处理OperationCa
  • SQLite 比 MySQL 更快?

    我想设置一个 teampeak 3 服务器 我可以选择 SQLite 和 MySQL 作为数据库 好吧 我通常倾向于 不要在生产中使用 SQLite 但另一方面 它是一个 Teamspeak 服务器 好吧 让我谷歌一下 我发现了这个 Spe
  • FOSUserBundle 一对一映射实体未保存

    大家好 我有一个关于在实体 FosUserBundle 中实现一对一的问题 用户实体与配置文件实体具有一对一的映射 我已经按照 FOSUserBundle 文档中所示覆盖了基本的 RegistrationFormType 记录也保存在两个表
  • 在 Terraform 中引用 gitlab 秘密

    我对 Terraforms 和 gitlab CI 很陌生 我想用它来做一些事情 我想使用 Terraform 创建 IAM 用户和 S3 存储桶 使用策略允许该 IAM 用户对此 S3 存储桶执行某些操作 将 IAM 用户的凭证保存在工件
  • 滚动您自己的明文 Wiki(数据库内的 Wiki)

    有人知道用于创建类似 wiki 的数据存储的 API 最好是 PHP 但我对任何语言都感兴趣 关于滚动您自己的纯文本 wiki 的任何资源怎么样 其他纯文本 wiki 如何处理文本文件的格式 我知道我可以使用 Markdown 或 Text
  • 现有模型和数据库表的rails g脚手架

    我想创建一个结构rails g scaffold Article 但我已经创建了表Articles和型号Articles 有什么办法可以做到这一点吗 rails generate scaffold controller Article
  • 如何在Plotly(python)中设置背景颜色,标题?

    下面是我的代码 有人可以告诉我如何设置背景颜色 标题 x轴y轴标签 scatterplot plot Scatter x x index y x rating mode markers marker dict size 10 color x
  • PHP 中的 Switch-Case 和 If-Else 有什么区别?

    我正在决定是否使用if else vs switch case在我正在编写的一个 PHP 网站中 我想知道使用其中之一是否有任何好处 或者是否在某些情况下打算使用其中一种而不是另一种 有趣的问题 因为在编译语言 甚至是 JIT 语言 中 使
  • 使 'week' 函数为双周

    嗨 这一切应该是一个简单的问题 我只是似乎无法弄清楚 我想每两周分解一次该数据集 以便以两周为间隔查看年度周期 我不想总结或汇总数据 我想做的正是 周 函数正在做的事情 但每两周一次 下面是数据和代码的示例 任何帮助将不胜感激 DF lt
  • 棋盘坐标

    我正在尝试用 Java 创建一个国际象棋程序 现在 我已经将棋盘与现有的部件一起完成 我可以用鼠标通过拖放来移动它们 我需要的是向两侧的方块添加坐标 就像在真正的板上一样 不一定要有什么奇特的东西 只是一个视觉效果 由于我没有使用图形绘制板
  • 使用触发器将数据从 SQL Server 插入到 MySql

    我有两个数据库 一个在 SQL Server 中 另一个在 MySql 中 我想在 SQL Server 表中进行插入时将数据插入 MySQL 表中 是否可以在 SQL Server 中使用触发器 请给我一个答案 您可以从以下位置创建连接M
  • 可以触发流选择特定文件

    我的程序连续读取流hadoop文件夹 比如 hadoopPath 它从上面的文件夹中选取所有文件 我可以只显示该文件夹的特定文件类型吗 例如 hadoopPath log 我还有一个与 Spark 和流媒体相关的问题 Spark Strea