Update
这个答案仍然有效且信息丰富,尽管自 2.2/2.3 以来情况现在有所改善,它增加了内置编码器支持Set
, Seq
, Map
, Date
, Timestamp
, and BigDecimal
。如果你坚持只使用 case 类和通常的 Scala 类型来创建类型,那么你应该可以使用隐式的SQLImplicits
.
不幸的是,几乎没有添加任何内容来帮助解决这个问题。正在寻找@since 2.0.0
in Encoders.scala or SQLImplicits.scala发现主要与原始类型有关的事情(以及案例类的一些调整)。那么,首先要说的是:目前对自定义类编码器没有真正良好的支持。考虑到这一点,接下来是一些技巧,考虑到我们目前可以使用的东西,这些技巧可以达到我们所希望的效果。作为预先的免责声明:这不会完美地工作,我会尽力预先明确所有限制。
到底是什么问题
当您想要创建数据集时,Spark“需要一个编码器(将 T 类型的 JVM 对象与内部 Spark SQL 表示进行相互转换),该编码器通常通过隐式从SparkSession
,或者可以通过调用静态方法显式创建Encoders
“(摘自docs on createDataset)。编码器将采用以下形式Encoder[T]
where T
是您正在编码的类型。第一个建议是添加import spark.implicits._
(这给你these隐式编码器),第二个建议是使用显式传递隐式编码器this一组编码器相关函数。
常规课程没有可用的编码器,因此
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
会给你以下隐式相关的编译时错误:
无法找到数据集中存储类型的编码器。通过导入 sqlContext.implicits 支持基本类型(Int、String 等)和 Product 类型(case 类)。未来版本将添加对其他类型序列化的支持
但是,如果您将刚刚使用的任何类型包装在扩展的某个类中,则会出现上述错误Product
,错误被令人困惑地延迟到运行时,所以
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
编译得很好,但在运行时失败
java.lang.UnsupportedOperationException:找不到 MyObj 的编码器
原因是 Spark 使用隐式创建的编码器实际上仅在运行时生成(通过 scala relfection)。在这种情况下,Spark 在编译时的所有检查都是最外层的类扩展Product
(所有案例类都会这样做),并且仅在运行时才意识到它仍然不知道如何处理MyObj
(如果我尝试制作一个,也会出现同样的问题Dataset[(Int,MyObj)]
- Spark 等到运行时才呕吐MyObj
)。这些是迫切需要解决的核心问题:
- 一些扩展的类
Product
编译尽管总是在运行时崩溃并且
- 无法为嵌套类型传入自定义编码器(我无法为 Spark 提供编码器
MyObj
这样它就知道如何编码Wrap[MyObj]
or (Int,MyObj)
).
只需使用kryo
每个人建议的解决方案是使用kryo编码器。
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
但这很快就会变得非常乏味。特别是如果您的代码正在操作各种数据集、连接、分组等。您最终会获得一堆额外的隐式。那么,为什么不直接创建一个隐式函数来自动完成这一切呢?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
现在,似乎我几乎可以做任何我想做的事情(下面的例子在spark-shell
where spark.implicits._
自动导入)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
或者说几乎。问题是使用kryo
导致 Spark 仅将数据集中的每一行存储为平面二进制对象。为了map
, filter
, foreach
这已经足够了,但是对于像这样的操作join
,Spark确实需要将它们分成列。检查架构d2
or d3
,您会看到只有一个二进制列:
d2.printSchema
// root
// |-- value: binary (nullable = true)
元组的部分解决方案
因此,在 Scala 中使用隐式的魔力(更多内容请参阅6.26.3 重载解析),我可以为自己创建一系列隐式变量,这些隐式变量将尽可能出色地完成工作,至少对于元组而言,并且可以与现有的隐式变量很好地配合使用:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
然后,有了这些隐式内容,我就可以使上面的示例正常工作,尽管需要进行一些列重命名
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
我还没有弄清楚如何获得预期的元组名称(_1
, _2
, ...) 默认情况下不重命名它们 - 如果其他人想玩这个,this是名字的地方"value"
被介绍和this是通常添加元组名称的地方。然而,关键是我现在有了一个很好的结构化模式:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
所以,总而言之,这个解决方法:
- 允许我们为元组获取单独的列(这样我们就可以再次加入元组,耶!)
- 我们可以再次依赖隐式(所以不需要传入
kryo
到处都是)
- 几乎完全向后兼容
import spark.implicits._
(涉及一些重命名)
- does not让我们加入
kyro
序列化的二进制列,更不用说那些可能具有的字段
- 将某些元组列重命名为“value”会产生令人不快的副作用(如果需要,可以通过转换来撤消此操作)
.toDF
,指定新的列名称,并转换回数据集 - 并且模式名称似乎是通过连接保留的,在最需要的地方)。
一般类的部分解决方案
这个就不太令人愉快了,也没有什么好的解决办法。然而,现在我们有了上面的元组解决方案,我有一种预感,另一个答案的隐式转换解决方案也会不那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集后,您可能会使用数据框方法重命名列。如果一切顺利的话,就是这样really这是一个改进,因为我现在可以在我的类的字段上执行连接。如果我只使用一个平面二进制文件kryo
序列化器这是不可能的。
这是一个可以完成所有操作的示例:我有一堂课MyObj
其中有类型字段Int
, java.util.UUID
, and Set[String]
。第一个自己会照顾好自己。第二个,虽然我可以使用序列化kryo
如果存储为String
(since UUID
通常是我想要加入反对的事情)。第三个实际上属于二进制列。
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
现在,我可以使用此机制创建一个具有良好模式的数据集:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
该模式向我显示了具有正确名称的列以及我可以加入的前两个内容。
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)