如何在Dataset中存储自定义对象?

2023-11-25

根据Spark 数据集简介:

在我们期待 Spark 2.0 的同时,我们计划对数据集进行一些令人兴奋的改进,具体来说: ... 自定义编码器——虽然我们目前自动生成各种类型的编码器,但我们希望为自定义对象开放 API。

并尝试将自定义类型存储在Dataset导致以下错误:

无法找到数据集中存储类型的编码器。通过导入 sqlContext.implicits 支持基本类型(Int、String 等)和 Product 类型(case 类)。未来版本将添加对其他类型序列化的支持

or:

Java.lang.UnsupportedOperationException:找不到编码器......

是否有任何现有的解决方法?


请注意,此问题仅作为社区 Wiki 答案的入口点存在。请随意更新/改进问题和答案。


Update

这个答案仍然有效且信息丰富,尽管自 2.2/2.3 以来情况现在有所改善,它增加了内置编码器支持Set, Seq, Map, Date, Timestamp, and BigDecimal。如果你坚持只使用 case 类和通常的 Scala 类型来创建类型,那么你应该可以使用隐式的SQLImplicits.


不幸的是,几乎没有添加任何内容来帮助解决这个问题。正在寻找@since 2.0.0 in Encoders.scala or SQLImplicits.scala发现主要与原始类型有关的事情(以及案例类的一些调整)。那么,首先要说的是:目前对自定义类编码器没有真正良好的支持。考虑到这一点,接下来是一些技巧,考虑到我们目前可以使用的东西,这些技巧可以达到我们所希望的效果。作为预先的免责声明:这不会完美地工作,我会尽力预先明确所有限制。

到底是什么问题

当您想要创建数据集时,Spark“需要一个编码器(将 T 类型的 JVM 对象与内部 Spark SQL 表示进行相互转换),该编码器通常通过隐式从SparkSession,或者可以通过调用静态方法显式创建Encoders“(摘自docs on createDataset)。编码器将采用以下形式Encoder[T] where T是您正在编码的类型。第一个建议是添加import spark.implicits._(这给你these隐式编码器),第二个建议是使用显式传递隐式编码器this一组编码器相关函数。

常规课程没有可用的编码器,因此

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

会给你以下隐式相关的编译时错误:

无法找到数据集中存储类型的编码器。通过导入 sqlContext.implicits 支持基本类型(Int、String 等)和 Product 类型(case 类)。未来版本将添加对其他类型序列化的支持

但是,如果您将刚刚使用的任何类型包装在扩展的某个类中,则会出现上述错误Product,错误被令人困惑地延迟到运行时,所以

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

编译得很好,但在运行时失败

java.lang.UnsupportedOperationException:找不到 MyObj 的编码器

原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection)。在这种情况下,Spark 在编译时的所有检查都是最外层的类扩展Product(所有案例类都会这样做),并且仅在运行时才意识到它仍然不知道如何处理MyObj(如果我尝试制作一个,也会出现同样的问题Dataset[(Int,MyObj)]- Spark 等到运行时才呕吐MyObj)。这些是迫切需要解决的核心问题:

  • 一些扩展的类Product编译尽管总是在运行时崩溃并且
  • 无法为嵌套类型传入自定义编码器(我无法为 Spark 提供编码器MyObj这样它就知道如何编码Wrap[MyObj] or (Int,MyObj)).

只需使用kryo

每个人建议的解决方案是使用kryo编码器。

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

但这很快就会变得非常乏味。特别是如果您的代码正在操作各种数据集、连接、分组等。您最终会获得一堆额外的隐式。那么,为什么不直接创建一个隐式函数来自动完成这一切呢?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

现在,似乎我几乎可以做任何我想做的事情(下面的例子在spark-shell where spark.implicits._自动导入)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

或者说几乎。问题是使用kryo导致 Spark 仅将数据集中的每一行存储为平面二进制对象。为了map, filter, foreach这已经足够了,但是对于像这样的操作join,Spark确实需要将它们分成列。检查架构d2 or d3,您会看到只有一个二进制列:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

元组的部分解决方案

因此,在 Scala 中使用隐式的魔力(更多内容请参阅6.26.3 重载解析),我可以为自己创建一系列隐式变量,这些隐式变量将尽可能出色地完成工作,至少对于元组而言,并且可以与现有的隐式变量很好地配合使用:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

然后,有了这些隐式内容,我就可以使上面的示例正常工作,尽管需要进行一些列重命名

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

我还没有弄清楚如何获得预期的元组名称(_1, _2, ...) 默认情况下不重命名它们 - 如果其他人想玩这个,this是名字的地方"value"被介绍和this是通常添加元组名称的地方。然而,关键是我现在有了一个很好的结构化模式:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

所以,总而言之,这个解决方法:

  • 允许我们为元组获取单独的列(这样我们就可以再次加入元组,耶!)
  • 我们可以再次依赖隐式(所以不需要传入kryo到处都是)
  • 几乎完全向后兼容import spark.implicits._(涉及一些重命名)
  • does not让我们加入kyro序列化的二进制列,更不用说那些可能具有的字段
  • 将某些元组列重命名为“value”会产生令人不快的副作用(如果需要,可以通过转换来撤消此操作).toDF,指定新的列名称,并转换回数据集 - 并且模式名称似乎是通过连接保留的,在最需要的地方)。

一般类的部分解决方案

这个就不太令人愉快了,也没有什么好的解决办法。然而,现在我们有了上面的元组解决方案,我有一种预感,另一个答案的隐式转换解决方案也会不那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集后,您可能会使用数据框方法重命名列。如果一切顺利的话,就是这样really这是一个改进,因为我现在可以在我的类的字段上执行连接。如果我只使用一个平面二进制文件kryo序列化器这是不可能的。

这是一个可以完成所有操作的示例:我有一堂课MyObj其中有类型字段Int, java.util.UUID, and Set[String]。第一个自己会照顾好自己。第二个,虽然我可以使用序列化kryo如果存储为String (since UUID通常是我想要加入反对的事情)。第三个实际上属于二进制列。

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

现在,我可以使用此机制创建一个具有良好模式的数据集:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

该模式向我显示了具有正确名称的列以及我可以加入的前两个内容。

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在Dataset中存储自定义对象? 的相关文章

  • IntelliJ IDEA Scala 插件问题

    我对新的 Intellij IDEA 10 和 Scala 插件有疑问 当我在 Scala 源文件中输入任何内容时 编辑器会永久冻结 在其他文件 java 和其他 编辑器中效果很好 结构视图 scala 检查和显示成员功能已关闭 堆大小增加
  • Scala 性能问题

    In the 丹尼尔 科泽夸 Daniel Korzekwa 撰写的文章 http blog danmachine com 2011 01 moving from java to scala one year html 他说以下代码的性能
  • 了解如何使用 apply 和 unappy

    我试图更好地理解 的正确用法apply and unapply方法 考虑到我们想要序列化和反序列化的对象 这是正确的用法吗 即斯卡拉方式 的使用apply and unapply case class Foo object Foo appl
  • Spark RDD默认分区数

    版本 Spark 1 6 2 Scala 2 10 我正在执行以下命令spark shell 我试图查看 Spark 默认创建的分区数量 val rdd1 sc parallelize 1 to 10 println rdd1 getNum
  • 我想使用 EtherPad(或克隆版本)。我的站点正在运行 Ruby on Rails。 API 还是本地安装?

    我想在我的网站上使用 etherpad 界面 两个问题 1 是否有任何带有 etherpad api 的网站可以让我远程调用 2 如果没有 安装scala并让两者同时运行有多麻烦 Thanks 查看http piratepad net ht
  • Pyspark显示最大值(S)和多重排序

    感谢这里的一些帮助 使用Pyspark 请不能使用SQL 所以我有一个存储为 RDD 对的元组列表 城市1 2020 03 27 X1 44 城市1 2020 03 28 X1 44 City3 2020 03 28 X3 15 City4
  • 新式(“内联”)宏需要 scala.meta

    我刚刚更新到 scala meta 2 0 0 M1 和最新的 scala 2 12 3 现在宏不再编译 我所做的唯一更改是将元版本从 1 8 0 更改为 2 0 0 M1 错误 新式 内联 宏需要 scala meta 有谁知道是否有快速
  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • 对于空列表,max() 应该返回什么?

    Got java util NoSuchElementException head of empty list所以我试着检查一下 但现在我明白了 info max of a few numbers FAILED info 0 did not
  • 了解 Scala 中的中缀方法调用和缺点运算符(::)

    我对 Scala 编程语言相当陌生 当我遵循以下网站的讲义时 我正在尝试一些萦绕在我脑海中的东西 here http horstmann com sjsu cs152 04 closures1 html 我想我无法真正理解 cons 运算符
  • Source.getLines 中的默认参数错误 (Scala 2.8.0 RC1)

    假设我运行 Scala 2 8 0 RC1 以下 scala 代码应该打印出文件 c hello txt 的内容 for line lt Source fromPath c hello txt getLines println line 但
  • Scala:类似 Option (Some, None) 但具有三种状态:Some、None、Unknown

    我需要返回值 当有人询问值时 告诉他们以下三件事之一 这是值 没有价值 我们没有关于该值的信息 未知 情况 2 与情况 3 略有不同 示例 val radio car radioType 我们知道该值 返回无线电类型 例如 pioneer
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala
  • PySpark Yarn 应用程序在 groupBy 上失败

    我正在尝试在 Yarn 模式下运行一个处理大量数据的作业 2TB 从谷歌云存储读取 管道可以总结如下 sc textFile gs path json map lambda row json loads row map toKvPair g
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • 缓存 Slick DBIO 操作

    我正在尝试加快 SELECT FROM WHERE name 的速度Play 中的查询类型 Scala 应用程序 我正在使用 Play 2 4 Scala 2 11 play slick 1 1 1 包 该软件包使用Slick 3 1版本
  • 运行具有外部依赖项的 Scala 脚本

    我在 Users joe scala lib 下有以下 jar commons codec 1 4 jar httpclient 4 1 1 jar httpcore 4 1 jar commons logging 1 1 1 jar ht
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将

随机推荐

  • 具有多种内容类型的 ContentChildren

    您好 我目前正在构建一个允许其中包含多种列类型的表 我希望能够像这样使用它
  • 架构arm64的重复符号(Xcode错误)

    我从项目中删除了对 GoogleMobileAdsSDKiOS 7 1 的所有引用 并添加了 7 4 1 当我在模拟器上运行应用程序时 一切正常 但在设备上运行时 我收到 App Mach O 链接器错误 duplicate symbol
  • 从 mysql 字段内的 json 中提取数据

    我有一个包含行的表 其中一行有一个包含如下数据的字段 name Richard lastname null city Olavarria cityId null 我想选择我拥有的所有不同的 城市 值 仅使用mysql服务器 是否可以 我正在
  • 当被属性遮蔽时修改类 __dict__

    我正在尝试修改类中的值 dict 直接使用类似的东西X dict x 1 不可能进行这样的修改 因为一个类 dict 实际上是一个mappingproxy不允许直接修改值的对象 尝试直接修改或等效的原因是我试图将类属性隐藏在元类上定义的同名
  • Facebook 页面自动“赞”URL(用于 QR 码)

    我想知道是否可以构建一个 URL 来自动喜欢 Facebook 页面 然后 这个 URL 可以转换为 QR 码 这样人们就可以通过使用智能手机阅读您的页面来自动 喜欢 您的页面 我已经搜索了很多 但到目前为止我所能找到的只是商业服务 例如S
  • AppCompatActivity.onCreate 只能从同一库组内调用

    升级到appcompat后25 1 0我开始遇到奇怪的错误 在我的代码中 Override protected void onCreate Bundle savedInstanceState super onCreate savedInst
  • SSIS 存储过程调用

    我正在尝试调用一个简单的存储过程 它将返回正常测试格式的名称列表 全部在一行中 我向它传递了两个参数 但无论我如何设置调用 无论是在 OLE DB 源编辑器中 还是在执行 SQL 任务中 我的 SQL 语句中一定缺少一些东西 因为我不断收到
  • HTML5 Canvas 在绘图时闪烁

    我从一个等距游戏开始 当绘制地面的所有部分时 我的画布正在闪烁 不在 IE 中 当我将 fps 设置为 20 或更低时 闪烁停止 我该如何解决这个问题 有任何想法吗 var camerax 300 cameray 100 var fps 6
  • 如何解决 Xcode 上 Storyboard 的警告? [复制]

    这个问题在这里已经有答案了 Trailing Leading constraint is missing which may cause overlapping with other views 我的故事板 只需放置 2 个标签及其约束 避
  • ui:repeat 不适用于 f:selectItem

    我正在使用 Icefaces 选择菜单从用户列表中选择用户 我想为每个用户重复 selectItem 这是我尝试过的
  • 您的实用工具包中有什么? [关闭]

    Closed 这个问题不符合堆栈溢出指南 目前不接受答案 我认识的一些最高效的工程师 开发人员和 IT 专业人员通常都会携带一个由有用程序 插件或实用程序组成的通用 工具包 以帮助他们进行日常调试 开发或设计 问题是 您的实用工具包中有什么
  • 如何在 Visual Studio 2008 中添加 ASP.NET 的页面事件

    这是一个 Visual Studio 问题 我觉得所有有用的智能感知应该有一些帮助 但我似乎找不到它 我在 VS2008 中用 ASP NET C 制作了一个带有代码隐藏的页面 当然它会自动生成 PageLoad 事件方法 那么 如果我想为
  • rjava .jcall 问题

    我目前正在开发一个 R 包以将 java 代码集成到 R 中 但是 我在尝试正确调用 java 类方法时遇到问题 到目前为止我已经独立开发了一个java程序并编译成class文件 然后打包为jar文件 我的代码示例如下 library rJ
  • 在Android中设置每天在特定时间重复闹钟

    我正在使用闹钟管理器在每天的特定时间运行闹钟 下面是代码 Calendar calendar Calendar getInstance calendar setTimeInMillis System currentTimeMillis ca
  • 使用 epoll 进行非阻塞 tcp 连接

    我的 Linux 应用程序正在执行非阻塞 TCP 连接系统调用 然后使用epoll wait检测三向握手完成 有时epoll wait两者都返回POLLOUT POLLERR为同一套接字描述符设置的事件 我想了解 TCP 级别发生了什么 我
  • uisearchDisplayController:更改标签“无结果”

    使用 uisearchDisplayController 时如何更改标签 无结果 Regards 我已经成功删除了标签 因为从来没有空结果集 如果因为从服务器获取而没有结果 请将数据源重置为单行 并让它显示空白的表视图单元格 此外 使用逻辑
  • 为什么我在尝试检查偶数/奇数时会收到“类型错误:字符串格式化期间未转换所有参数”?

    这段代码给出了一个错误 print type a whole number n input if n 2 1 print Odd else print Even 我假设我必须对 if 语句中的变量 n 做一些特殊的事情 我是 Python
  • simple_form的collection_radio_button和自定义标签类

    我正在尝试使用 FontAwesome 制作带有无线电集合的星级评级表格 为此我实际上需要更改 simple form 生成的 collection radio button 输入的标签类 但找不到任何明显的解决方案 到目前为止我使用 fo
  • GLM 如何处理翻译

    OpenGL 数学库 GLM 使用以下算法来计算平移矩阵 taken from source code template
  • 如何在Dataset中存储自定义对象?

    根据Spark 数据集简介 在我们期待 Spark 2 0 的同时 我们计划对数据集进行一些令人兴奋的改进 具体来说 自定义编码器 虽然我们目前自动生成各种类型的编码器 但我们希望为自定义对象开放 API 并尝试将自定义类型存储在Datas