我知道使用以下方法将新列添加到 Spark 数据集的方法.withColumn()
and a UDF
,它返回一个 DataFrame。我还知道,我们可以将生成的 DataFrame 转换为 DataSet。
我的问题是:
- 如果我们仍然遵循传统的 DF 方法(即将列名称作为 UDF 输入的字符串传递),那么 DataSet 的类型安全性在这里如何发挥作用
- 是否有一种“面向对象的方式”来访问列(无需将列名作为字符串传递),就像我们以前对 RDD 所做的那样,用于附加新列。
- 如何在地图、过滤器等正常操作中访问新列?
例如:
scala> case class Temp(a : Int, b : String) //creating case class
scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS // creating DS
scala> val appendUDF = udf( (b : String) => b + "ing") // sample UDF
scala> df.withColumn("c",df("b")) // adding a new column
res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]
scala> res5.as[Temp] // converting to DS
res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]
scala> res6.map( x =>x.
// list of autosuggestion :
a canEqual equals productArity productIterator toString
b copy hashCode productElement productPrefix
新专栏c
,我添加了使用.withColumn()
不可访问,因为列c
不在案例类别中Temp
(它只包含a
& b
)在使用转换为 DS 的瞬间res5.as[Temp]
.
如何访问专栏c
?
在类型安全的世界中Dataset
您可以将一个结构映射到另一个结构中。
也就是说,对于每个转换,我们需要数据的模式表示(因为 RDD 需要它)。要访问上面的“c”,我们需要创建一个新的架构来提供对其的访问。
case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC
val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]
val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)