Spark - 如何组合/合并 Seq[Row] 中 Dataframe 中的元素以生成 Row

2023-12-12

首先我想说我被迫使用 Spark 1.6

我正在生成一个DataFrame来自这样的 JSON 文件:

{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "28"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}

The DataFrame好像:

+---+----+-------+
|age|  id|   name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 28|1203|  amith|
| 23|1204|  javed|
| 25|1205|  mendy|
| 24|1206|    rob|
| 23|1207| prudvi|
+---+----+-------+

我用这个做什么DataFrame就是按年龄分组,按id排序,过滤所有年龄组中学生人数超过1人的。我使用以下脚本:

import sqlContext.implicits._

val df = sqlContext.read.json("students.json")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val arrLen = udf {a: Seq[Row] => a.length > 1 }

val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age")

val filterd = mergedDF.filter(arrLen(col("newCol")))

现在当前的结果是:

[WrappedArray([28,1203,amith], [28,1202,krishna]),28]
[WrappedArray([25,1201,satish], [25,1205,mendy]),25]
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23]

我现在想要的是将这两个学生行合并到WrappedArray合而为一,例如id第一个学生和name第二个学生的。

为了实现这一点,我编写了以下函数:

def PrintOne(List : Seq[Row], age : String):Row  ={ 
      val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name")) 
      val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema)

      mergedStudent
    }

我知道这个函数可以解决问题,因为当我使用 foreach 测试它时,它会打印出预期值:

filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age"))
                         println("merged student: "+student)
                   }

OutPut:

merged student: [28,1203,krishna]
merged student: [23,1204,prudvi]
merged student: [25,1201,mendy]

但是,当我尝试在地图内执行相同的操作来收集返回值时,问题就开始了。

如果我在没有编码器的情况下运行:

val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}

我得到以下异常:

线程“main”中的异常 java.lang.UnsupportedOperationException:否 找到 org.apache.spark.sql.Row 的编码器 - 字段(类:“org.apache.spark.sql.Row”,名称:“_2”) - 根类:“scala.Tuple2”

当我尝试生成一个Econder就我自己而言,我也失败了:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
    implicit val encoder = RowEncoder(filterd.schema)

    val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder)

类型不匹配;成立 : org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] 必需:org.apache.spark.sql.Encoder[(字符串, org.apache.spark.sql.Row)]

我怎样才能提供正确的编码器,或者更好的是避免它?

我被告知要避免使用映射+自定义函数,但我需要应用的逻辑比仅从每一行中选取一个字段更复杂。将多个字段组合起来,检查行的顺序以及值是否为空将更加重要。据我所知,只需使用自定义函数就可以解决它。


的输出map属于类型(String, Row)因此它不能使用编码RowEncoder独自的。您必须提供匹配的元组编码器:

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val encoder = Encoders.tuple(
  Encoders.STRING,
  RowEncoder(
    // The same as df.schema in your case
    StructType(Seq(
      StructField("age", StringType), 
      StructField("id", StringType),
      StructField("name", StringType)))))

filterd.map{row => (
  row.getAs[String]("age"),
  PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))
}(encoder)

总的来说,这种方法看起来像是一种反模式。如果你想使用更实用的风格,你应该避免Dataset[Row]:

case class Person(age: String, id: String, name: String)

filterd.as[(Seq[Person], String)].map { 
  case (people, age)  => (age, (age, people(0).id, people(1).name))
}

or udf.

另请注意o.a.s.sql.catalyst包,包括GenericRowWithSchema,主要供内部使用。除非有必要,否则最好o.a.s.sql.Row.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark - 如何组合/合并 Seq[Row] 中 Dataframe 中的元素以生成 Row 的相关文章

  • Java / Scala Future 由回调驱动

    简洁版本 我怎样才能创建一个Promise
  • Scala 集合不一致

    为什么 Scala Collections API 中的集合和列表之间缺乏一致性 例如 有不可变的 Set 但也有可变的 Set 如果我想使用后者 我可以简单地这样做 val set Set A set new A 但是 本身不存在可变列表
  • Scala:具有复杂结构的树插入尾递归

    我正在 scala 中创建自定义对象树 并且我的插入方法引发堆栈溢出 因为它不是尾递归 但是 我不太清楚如何使其尾递归 我见过使用 累加器 变量的相关示例 但它们要么是只能相乘和覆盖的整数之类的东西 要么是我在适应树时遇到困难的列表 这是我
  • 宏:knownDirectSubclasses 被嵌套类型破坏?

    我有一个宏 它枚举密封特征的直接子类型 import scala reflect macros Context import language experimental macros object Checker def apply A U
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 有没有办法捕获 Spark 中使用通配符读取的多个 parquet 文件的输入文件名?

    我使用 Spark 将多个 parquet 文件读取到单个 RDD 中 并使用标准通配符路径约定 换句话说 我正在做这样的事情 val myRdd spark read parquet s3 my bucket my folder parq
  • Source.getLines 中的默认参数错误 (Scala 2.8.0 RC1)

    假设我运行 Scala 2 8 0 RC1 以下 scala 代码应该打印出文件 c hello txt 的内容 for line lt Source fromPath c hello txt getLines println line 但
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • PySpark Yarn 应用程序在 groupBy 上失败

    我正在尝试在 Yarn 模式下运行一个处理大量数据的作业 2TB 从谷歌云存储读取 管道可以总结如下 sc textFile gs path json map lambda row json loads row map toKvPair g
  • 使用多行选项和编码选项读取 CSV

    在 azure Databricks 中 当我使用以下命令读取 CSV 文件时multiline true and encoding SJIS 似乎编码选项被忽略了 如果我使用multiline选项 Spark 使用默认值encoding那
  • Play Framework 2.3 (Scala) 中的自定义 JSON 验证约束

    我设法使用自定义约束实现表单验证 但现在我想对 JSON 数据执行相同的操作 如何将自定义验证规则应用于 JSON 解析器 示例 客户端的 POST 请求包含用户名 username 我不仅要确保该参数是非空文本 而且还要确保该用户确实存在
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Scala 模式匹配变量绑定

    为什么提取器返回时不能以 样式绑定变量Option
  • 运行具有外部依赖项的 Scala 脚本

    我在 Users joe scala lib 下有以下 jar commons codec 1 4 jar httpclient 4 1 1 jar httpcore 4 1 jar commons logging 1 1 1 jar ht
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • Scala 特性:val/def 和 require

    下面的代码抛出IllegalArgumentException trait T val x Long require x gt 0 object T extends App val y new T val x 42L 而以下情况则不然 tr
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag
  • 分析 sbt 构建

    我的 sbt 构建需要很长时间 它又大又复杂 很难知道从哪里开始清理 看起来 sbt 保留了很多关于构建结构的元数据 包括相互依赖关系 命名任务 范围界定等 有了所有这些元数据 似乎很容易跳入并测量每个不同任务 及其范围 花费的时间 在代码
  • 在 Scala 中,使用“_”和使用命名标识符有什么区别?

    为什么当我尝试使用时会出现错误 而不是使用命名标识符 scala gt res0 res25 List Int List 1 2 3 4 5 scala gt res0 map gt item toString

随机推荐