SparkSQL CSV 的引用不明确

2023-12-31

我正在尝试在 SparkSQL 2.10 中读取一堆 CSV 文件,其自定义架构部分是 Double,部分是 String,如下所示:

// Build the schema
val schemaStringS = "col1 col2"
val schemaStringD = "col3 col4 col5 col6"
val schemaStringS2 = "col7 col8"
val fieldsString = schemaStringS.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val fieldsString2 = schemaStringS2.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val fieldsDouble = schemaStringS.split(" ")
  .map(fieldName => StructField(fieldName, DoubleType, nullable = true))
val schema = StructType(fieldsString ++ fieldsDouble ++ fieldsString2)

// Read DataFrame
val input = sqlContext.read.schema(schema)
  .option("header", true)
  .csv("/files/*.csv")
  .toJavaRDD

这导致

Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'col6' is ambiguous, could be: col6#0, col6#5.;
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:158)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:130)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:129)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at org.apache.spark.sql.types.StructType.map(StructType.scala:96)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:129)
    at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:83)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.toJavaRDD(Dataset.scala:2557)
    at com.otterinasuit.spark.sensorlog.main.Main$.main(Main.scala:39)
    at com.otterinasuit.spark.sensorlog.main.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

我尝试使用 cat 合并文件(仅适用于 PoC)并避免使用 CSV 库(认为这可能是新 Spark 版本中的错误),但无济于事。

val input = sc.textFile("/csv/*.csv")
.map(line => line.split(",")).filter(row => !row.contains("col1")).map(x => Row(x))
val input2 = sqlContext.createDataFrame(input, schema)

我在常规 DataFrame 和联接中遇到了这个问题,这可以通过指定列名称、删除特定列或使用不同的联接来解决。但是,在这种情况下,我没有这个选择。

所有文件中的所有标头都是相同的,如以下证明head -1 *.csv。我不明白为什么会发生这种情况。


Both fieldsString and fieldsDouble指的是schemaStringS.

val fieldsString = schemaStringS.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))

//Changing from schemaStringS to schemaStringD
val fieldsDouble = schemaStringD.split(" ")
  .map(fieldName => StructField(fieldName, DoubleType, nullable = true))

所以,当你合并时

val schema = StructType(fieldsString ++ fieldsDouble ++ fieldsString2))

它正在投掷'col6' is ambiguous error,

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkSQL CSV 的引用不明确 的相关文章

  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 使用 fgetcsv 循环遍历 csv

    我有一个包含 3 列的 csv 文件 电子邮件地址 名 and 姓 我已经到了可以使用以下代码打印数组的阶段 这会打印数组 因此每个字段都在一行中 我希望它打印的只是该行第一列中的值 这是如何完成的 关于 fgetcsv 的文档对我 相对初
  • Scala:类似 Option (Some, None) 但具有三种状态:Some、None、Unknown

    我需要返回值 当有人询问值时 告诉他们以下三件事之一 这是值 没有价值 我们没有关于该值的信息 未知 情况 2 与情况 3 略有不同 示例 val radio car radioType 我们知道该值 返回无线电类型 例如 pioneer
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala
  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Scala:如何在超类上实现克隆方法,并在子类中使用它?

    我可能会以错误的方式处理这个问题 但我想要一个像这样的对象 class MyDataStructure def myClone val clone new MyDataStructure do stuff to make clone the
  • R 中的列乘以子字符串

    假设我有一个数据框 其中包含多个组件及其在多个列中列出的属性 并且我想对这些列运行多个函数 我的方法是尝试将其基于每个列标题中的子字符串 但我无法弄清楚如何做到这一点 下面是数据框的示例 Basket F Type 1 F Qty 1 F
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将
  • AWK:递归下降 CSV 解析器

    响应一个BASH 中的递归下降 CSV 解析器 https codereview stackexchange com questions 11727 need some advice or help with translation and
  • 计算 R 中各列的唯一值

    我正在尝试创建一个新变量 其中包含来自两个不同列的字符串值的唯一计数 所以我有这样的东西 例如 A tibble 4 x 2 names partners
  • Android Excel CSV 的 MIME 数据类型是什么?

    我尝试了 text csv 甚至 application vnd ms excel 但 Excel 不会显示在选择列表中 很多其他应用程序也可以 void shareCsv Uri uri Context context Intent in
  • 玩:将表单字段绑定到双精度型?

    也许我只是忽略了一些明显的事情 但我无法弄清楚如何将表单字段绑定到 Play 控制器中的双精度型 例如 假设这是我的模型 case class SavingsGoal timeframeInMonths Option Int amount
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag
  • 在 Spark 中计算逻辑回归系数的标准误差

    我知道这个问题之前已经被问过here https stackoverflow com questions 37816701 calculating standard error of estimate wald chi square sta
  • 识别 pandas 数据框中各组之间的差异

    我有一个按日期和 ID 索引的 pandas 数据框 我想 识别日期之间增删的ID 将 ID 添加到另一个数据帧以及添加 删除的日期 date ID value 12 31 2010 13 0 124409 9 0 555959 1 0 7
  • Python Selenium:如何在文本文件中打印网站上的值?

    我正在尝试编写一个脚本 该脚本将从 tulsaspca org 网站获取以下 6 个值并将其打印在 txt 文件中 最终输出应该是 905 4896 7105 23194 1004 42000 放置的动物 的 HTML span class

随机推荐

  • Firebase 数据库未找到类错误 (NoClassDefFoundError)

    我有一个使用多个子模块的大型项目 我已将 firebase 数据库依赖项添加到我的项目中包含的模块之一 当本地测试模块作为开放项目包含时 一切正常 当使用封闭模块构建时 我的模块是aar文件 我收到以下错误 java lang NoClas
  • 如何重命名 WordPress 管理仪表板上的 WooCommerce 选项卡下的菜单选项卡

    我需要帮助重命名 WordPress 管理上 woocommerce 选项卡下的选项卡菜单项 我们安装了一个插件 该插件显示为 woocommerce 选项卡上的子菜单 有人可以帮我解决这个问题吗 我发现下面的代码可以重命名选项卡菜单 但我
  • 属性不应返回数组

    是的 我知道这个问题之前已经讨论过很多次了 我阅读了有关这个问题的所有帖子和评论 但似乎仍然无法理解一些东西 MSDN 提供的解决此违规问题的选项之一是返回收藏 or an 界面这是由一个实现的收藏 在访问该属性时 无论它多么明显并不能解决
  • Flex 默认规则

    如何自定义 Flex 的默认操作 我发现类似 的内容 但当我运行它时 它显示 柔性扫描仪卡住 还有 规则仅添加一条规则 因此它也不起作用 我想要的是 comment comment return 1 default return 0 lt
  • &'a T 是否意味着 T: 'a?

    从我自己的理解和实验来看 这似乎是正确的 但我还没有找到记录它的权威来源 Rust by Example 有一个bounds https doc rust lang org rust by example scope lifetime li
  • Google Play 中不允许使用 com.example 包名称

    我是 Android 应用程序开发新手 我正在创建简单的 Android 应用程序 导出 android 应用程序并将包名称指定为AndroidManifestfile包名为 com example zingyminds apk 现在我得到
  • 使用 FFT 进行高斯图像滤波

    对于图像分割 我使用 OpenCV 的高斯特征差异GaussianBlur 范围从 0 8 到 8 43 指数步长为 1 4 我的图像尺寸为 4096 x 2160 因此这需要相当长的时间 在一个核心上需要 8 秒 这在处理视频时相当长 您
  • 如何更改活动/选定选项卡的颜色?

    我希望当未选择选项卡时颜色为默认灰色 但作为我的颜色tabBarColor选择选项卡时的颜色 我找不到更改标签栏中标题颜色的方法 我怎样才能做到这一点 这是我的代码 Home screen TabNavigator Home screen
  • lub(T1,...Tn) 是什么意思?

    以下引用来自 JLS 14 20 http docs oracle com javase specs jls se8 html jls 14 html jls 14 20 异常参数的声明类型 将其类型表示为 与替代方案 D1 的结合D2 D
  • OpenCV 霍​​夫圆

    我使用 Xcode 和 c 我已经从以下位置复制了 HoughCircles 代码OpenCV 文档 http opencv willowgarage com documentation cpp feature detection html
  • 在 SageMath 中运行时使用 Dask 会抛出 ImportError

    最近 我一直在尝试使用 Dask 并行化一些 Sage 运行 OSX 11 2 3 的 MacBook Pro 上的 Sage 9 4 代码 我遇到的问题是 虽然我可以在 Sage 中运行 Dask 但每当我包含任何非 纯 python 代
  • 我们可以将事件侦听器添加到“Vega-Lite”规范吗?

    我是 Vega 和 Vega Lite 的新手 我正在使用 Vega Lite 创建一个简单的条形图 但我无法添加任何事件侦听器 例如 徘徊 我想将鼠标悬停在一个栏上并更改该栏的颜色 如果您正在使用Vega嵌入 https github c
  • 如何收到图库应用程序可见的每个新图像的通知?

    背景 当用户下载新图像或使用相机捕获图像时 图库应用程序将更新以显示新图像 我需要在创建每个新图像后立即收到通知 无论它是如何创建的 相机 浏览器 就像图库应用程序所示 问题 事实证明 有一个 mediaScanner Android 组件
  • Curl:传输已关闭,剩余未完成的读取数据

    我遇到了大卷曲调用的问题 I get nread 传输已关闭 剩余未完成的读取数据 并且内容已部分交付 GET stats stats breakdown track track campaign search criteria 2 per
  • 无法生成用于构建和调试的资产。 OmniSharp 服务器未运行

    在 Visual Studio VS Code 上 使用 C 进行编码 我正在尝试生成要构建和调试的资产 但收到以下错误消息 无法生成用于构建和调试的资产 OmniSharp 服务器未运行 我在跑 NET版本3 1 301 视窗8 1 Vi
  • 从小表中删除重复行

    我在 PostgreSQL 8 3 8 数据库中有一个表 该表没有键 约束 并且有多行具有完全相同的值 我想删除所有重复项并仅保留每行的 1 个副本 特别有一列 名为 key 可用于识别重复项 即每个不同的 key 应该只存在一个条目 我怎
  • 需要在ggplot2中绘制条形图(以百分位方式)

    嗨 我有一个这样的数据集 ALL Critical Error Warning Review 2016 1412 475 4 125 154 45 49 2 58 116 86 12 1 17 我想使用 ggplot2 绘制堆叠条形图 其中
  • 混合构造函数并在 Javascript 代理对象上应用陷阱

    我有一个类 我想对其应用代理 观察方法调用和构造函数调用 计算器 js class Calc constructor add a b return a b minus a b return a b module exports Calc i
  • 仅查看 Mercurial 中的目录?

    如何仅从 Mercurial 存储库中查看子目录 看来我只能查看整个存储库 你不能 请参阅此处的讨论 https www mercurial scm org wiki PartialClone https www mercurial scm
  • SparkSQL CSV 的引用不明确

    我正在尝试在 SparkSQL 2 10 中读取一堆 CSV 文件 其自定义架构部分是 Double 部分是 String 如下所示 Build the schema val schemaStringS col1 col2 val sche