Spark:DF.as[Type] 无法编译

2023-12-11

我正在尝试运行 Spark 书中的示例Spark: The Definitive Guide

build.sbt

ThisBuild / scalaVersion := "3.2.1"

libraryDependencies ++= Seq(
  ("org.apache.spark" %% "spark-sql" % "3.2.0" % "provided").cross(CrossVersion.for3Use2_13)
)

Compile / run := Defaults.runTask(Compile / fullClasspath, Compile / run / mainClass, Compile / run / runner).evaluated

lazy val root = (project in file("."))
  .settings(
    name := "scalalearn"
  )

main.scala

// imports
...

object spark1 {
  @main
  def main(args: String*): Unit = {
    ...

    case class Flight(
                       DEST_COUNTRY_NAME: String,
                       ORIGIN_COUNTRY_NAME: String,
                       count: BigInt
                     )

    val flightsDF = spark.read
      .parquet(s"$dataRootPath/data/flight-data/parquet/2010-summary.parquet/")

    //    import spark.implicits._ // FAILS
    //    import spark.sqlContext.implicits._ // FAILS

    val flights = flightsDF.as[Flight]

    // in Scala
    flights
      .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
      .map(flight_row => flight_row)
      .take(5)

    spark.stop()
  }
}

线路出现错误val flights = flightsDF.as[Flight]

Unable to find encoder for type Flight. An implicit Encoder[Flight] is needed to store Flight
 instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes)
 are supported by importing spark.implicits._ Support for serializing other types will be added in
 future releases.

任何帮助表示赞赏。

斯卡拉 - 3.2.1 火花 - 3.2.0

尝试从以下位置导入隐式spark.implicits._ and spark.sqlContext.implicits._该示例适用于 scala 2.x

寻找一种无需任何第三方解决方法即可将 DF 转换为案例类的方法


您需要为 Spark 编解码器添加 Scala-3 依赖项

https://github.com/vincenzobaz/spark-scala3

libraryDependencies += "io.github.vincenzobaz" %% "spark-scala3" % "0.1.3"

并导入 Scala-3

import scala3encoders.given

而不是 Scala-2

import spark.implicits._ // FAILS
import spark.sqlContext.implicits._ // FAILS

Scala Spark Encoders.product[X](其中 X 是案例类)不断给出“No TypeTag available for X”错误


关于BigInt,

Spark 支持 BigInteger 类型吗?

Spark确实支持JavaBigInteger但可能会损失一些精度。如果数值BigInteger适合一个long(即 -2^63 和 2^63-1 之间)那么它将被 Spark 存储为LongType。否则它将被存储为DecimalType,但该类型仅支持 38 位精度。

为相对较小的数据提供正确的编解码器BigInts(适合LongType) are

import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, LongType, ObjectType}

given Deserializer[BigInt] with
  def inputType: DataType = LongType

  def deserialize(path: Expression): Expression =
    StaticInvoke(
      BigInt.getClass,
      ObjectType(classOf[BigInt]),
      "apply",
      path :: Nil,
      returnNullable = false
    )

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    Invoke(inputObject, "longValue", LongType, returnNullable = false)

import scala3encoders.given

https://github.com/DmytroMitin/spark_stackoverflow/blob/87ef5361dd3553f8cc5ced26fed4c17c0061d6a2/src/main/scala/main.scala

(https://github.com/databricks/Spark-The-Definitive-Guide)

https://github.com/yashwanthreddyg/spark_stackoverflow/pull/1

https://gist.github.com/DmytroMitin/3c0fe6983a254b350ff9feedbb066bef

https://github.com/vincenzobaz/spark-scala3/pull/22

对于大型BigInts(不适合LongType when DecimalType是必要的)编解码器是

import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.types.{DataType, DataTypes, Decimal, ObjectType}

val decimalType = DataTypes.createDecimalType(38, 0)

given Deserializer[BigInt] with
  def inputType: DataType = decimalType

  def deserialize(path: Expression): Expression =
    Invoke(path, "toScalaBigInt", ObjectType(classOf[scala.math.BigInt]), returnNullable = false)

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    StaticInvoke(
      Decimal.getClass,
      decimalType,
      "apply",
      inputObject :: Nil,
      returnNullable = false
    )

import scala3encoders.given

这几乎与

import org.apache.spark.sql.catalyst.DeserializerBuildHelper.createDeserializerForScalaBigInt
import org.apache.spark.sql.catalyst.SerializerBuildHelper.createSerializerForScalaBigInt
import scala3encoders.derivation.{Deserializer, Serializer}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.{DataType, DataTypes, ObjectType}

val decimalType = DataTypes.createDecimalType(38, 0)

given Deserializer[BigInt] with
  def inputType: DataType = decimalType

  def deserialize(path: Expression): Expression =
    createDeserializerForScalaBigInt(path)

given Serializer[BigInt] with
  def inputType: DataType = ObjectType(classOf[BigInt])

  def serialize(inputObject: Expression): Expression =
    createSerializerForScalaBigInt(inputObject)

import scala3encoders.given

https://gist.github.com/DmytroMitin/8124d2a4cd25c8488c00c5a32f244f64

您观察到的运行时异常意味着BigIntparquet 文件中的 s 相对较小(适合LongType)并且您尝试了我的大型编解码器BigInts (DecimalType)

https://gist.github.com/DmytroMitin/ad77677072c1d8d5538c94cb428c8fa4 (ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java': A method named "toScalaBigInt" is not declared in any enclosing class nor any supertype, nor through a static import)

反之亦然,对于大BigInts (DecimalType)和小型编解码器BigInts (LongType): https://gist.github.com/DmytroMitin/3a3a61082fbfc12447f6e926fc45c7cd (ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java': No applicable constructor/method found for actual parameters "org.apache.spark.sql.types.Decimal"; candidates are: ...)

我们不能同时使用这两种编解码器LongType and DecimalType: https://gist.github.com/DmytroMitin/32040a6b702fff5c53c727616b318cb5 (Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: All input types must be the same except nullable, containsNull, valueContainsNull flags. The input types found are LongType DecimalType(38,0))

对于小型和大型的混合BigInt正确的是使用编解码器DecimalType https://gist.github.com/DmytroMitin/626e09a63a387e6ff1d7fe264fc14d6b


手动创建的方法TypeTags 似乎也可以工作(不使用scala3encoders)

// libraryDependencies += scalaOrganization.value % "scala-reflect" % "2.13.10" // in Scala 3
import scala.reflect.api
import scala.reflect.runtime.universe.{NoType, Type, TypeTag, internal}
import scala.reflect.runtime.universe

inline def createTypeTag[T](mirror: api.Mirror[_ <: api.Universe with Singleton], tpe: mirror.universe.Type): mirror.universe.TypeTag[T] = {
  mirror.universe.TypeTag.apply[T](mirror.asInstanceOf[api.Mirror[mirror.universe.type]],
    new api.TypeCreator {
      override def apply[U <: api.Universe with Singleton](m: api.Mirror[U]): m.universe.Type = {
        tpe.asInstanceOf[m.universe.Type]
      }
    }
  )
}

val rm = universe.runtimeMirror(this.getClass.getClassLoader)
// val bigIntTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("scala.math"), Nil), rm.staticClass("scala.math.BigInt"), Nil)
// val strTpe = internal.typeRef(internal.typeRef(NoType, rm.staticPackage("java.lang"), Nil), rm.staticClass("java.lang.String"), Nil)
val flightTpe = internal.typeRef(NoType, rm.staticClass("Flight"), Nil)
// given TypeTag[BigInt] = createTypeTag[BigInt](rm, bigIntTpe)
// given TypeTag[String] = createTypeTag[String](rm, strTpe)
given TypeTag[Flight] = createTypeTag[Flight](rm, flightTpe)

import spark.implicits._

https://gist.github.com/DmytroMitin/bb0ccd5f1c533b2baec1756da52f8824

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:DF.as[Type] 无法编译 的相关文章

  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • 逆变方法参数类型

    wiki 逆变方法参数类型 https en wikipedia org wiki Covariance and contravariance 28computer science 29 Contravariant method argum
  • 宏:knownDirectSubclasses 被嵌套类型破坏?

    我有一个宏 它枚举密封特征的直接子类型 import scala reflect macros Context import language experimental macros object Checker def apply A U
  • 最小重复子串

    我正在看 Perl代码高尔夫页面 http www perlmonks org node id 82878 不要问为什么 并遇到了这个 第 3 洞 最小重复图案 编写一个子例程 它接受一个字符串 该字符串可能包含 重复模式 并返回最小的重复
  • 如何发现 Scala 远程 Actor 已死亡?

    在 Scala 中 当另一个 远程 actor 终止时 可以通过设置 trapExit 标志并以第二个 actor 作为参数调用 link 方法来通知一个 actor 在这种情况下 当远程参与者通过调用 exit 结束其工作时 第一个参与者
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 使用 Spray-json 解析简单数组

    我正在尝试 但失败了 了解 Spray json 如何将 json feed 转换为对象 如果我有一个简单的 key gt value json feed 那么它似乎可以正常工作 但是我想要读取的数据出现在如下列表中 name John a
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • PySpark Yarn 应用程序在 groupBy 上失败

    我正在尝试在 Yarn 模式下运行一个处理大量数据的作业 2TB 从谷歌云存储读取 管道可以总结如下 sc textFile gs path json map lambda row json loads row map toKvPair g
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 错误:协变类型 A 出现在逆变位置

    我试图写一个不可变的Matrix A 班级 我希望该类是协变的A但是当我把 在 前面A编译器开始抱怨类中的某些操作 以下是我的相关子集Matrix类 实际类比以下子集大 5 倍左右 class Matrix A private val co
  • Scala 和变量中的模式匹配

    我是 Scala 新手 有点想知道模式匹配是如何工作的 想象一下我有以下内容 case class Cls i Int case b Cls i gt Ok case e Cls gt Ok case f Cls gt Ok case s
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Scala Tuple2Zipped 与 IterableLike zip

    两种实现有什么区别 这个比那个好吗 有一篇博客文章说 Tuple2Zipped 性能更好 但没有提供原因 并且查看源代码我没有看到差异 val l1 List 1 2 3 val l2 List 5 6 7 val v1 l1 zip l2
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map
  • 玩:将表单字段绑定到双精度型?

    也许我只是忽略了一些明显的事情 但我无法弄清楚如何将表单字段绑定到 Play 控制器中的双精度型 例如 假设这是我的模型 case class SavingsGoal timeframeInMonths Option Int amount
  • 分析 sbt 构建

    我的 sbt 构建需要很长时间 它又大又复杂 很难知道从哪里开始清理 看起来 sbt 保留了很多关于构建结构的元数据 包括相互依赖关系 命名任务 范围界定等 有了所有这些元数据 似乎很容易跳入并测量每个不同任务 及其范围 花费的时间 在代码

随机推荐

  • 当应用程序在后台时重新启动没有用户界面的 iOS 应用程序?

    当应用程序在后台运行时 如何在没有用户界面的情况下重新启动iOS应用程序 我知道通知 但在任何通知中 用户必须单击警报视图按钮才能重新启动或打开应用程序 没有办法做到这一点 用户必须参与执行此操作
  • Javascript 可以检测移动设备是否静音吗?

    当用户回答正确 错误的问题时 我的网站当前会播放声音 但我注意到 在我的平板电脑 iPad 和手机 iPhone 上 即使我将其置于静音模式 它也会播放声音 当用户明确将设备设置为静音模式 铃声 时 我不希望手机 平板电脑播放声音 那么 我
  • Cordova + Crosswalk iOS 仍然使用 UIWebView

    我没有找到很多为 iOS 设置人行横道的说明 看起来应该很简单 cordova plugin add cordova plugin crosswalk webview cordova build ios 因为在执行以下操作之前 我遇到了插件
  • visio 服务器端自动化的替代方案

    微软建议不要对办公工具进行服务器端自动化 服务器端办公自动化的注意事项 但是 我看不到其他选择 我想做的是将 vsd 文件转换为 html 超链接的图像 地图 这在以下情况下尚不可能Aspose 图 在此处对类似问题的回答中引用了该内容 L
  • 如何将两个Excel文件及其工作表名称合并为一个?

    为了合并两个 Excel 工作表 我使用下面的代码 using System using Excel Microsoft Office Interop Excel using System Reflection namespace Merg
  • javascript用户选择突出显示

    我试图找到一种使用 javascript 的方法来突出显示用户在单击某些奇怪的突出显示按钮时选择的文本 如 突出显示的文本 它只需要与 WebKit 或 Firefox 一起工作 但这似乎几乎是不可能的 因为它必须在以下情况下工作 p th
  • 相邻元素相乘

    我有一个整数元组 例如 1 2 3 4 5 我想生成元组 1 2 2 3 3 4 4 5 通过相邻元素相乘 是否可以用单行代码来完成此操作 简短而甜蜜 请记住zip只运行最短的输入 print tuple x y for x y in zi
  • 查找:`./folder':没有这样的文件或目录

    这是我正在使用的目录的内容 misha hp laptop work c 5 ls l total 8 rw rw r 1 misha misha 219 May 20 15 37 demo c drwxrwxr x 2 misha mis
  • 是否可以使用按键侦听器显示以前隐藏的 JFrame

    这是我的代码 我基本上只是为最常见的侦听器做了一个测试器 我稍后可能会在未来的项目中使用它 主要问题是在底部的关键侦听器中 我正在尝试重新显示框架 但我认为它只是不能那样做 请帮忙 ps 不知道为什么导入没有正确显示 package new
  • 如何像 instagram uwp 应用程序一样动画网格背景?

    The background keeps on transforming from one gradient to other very beautifully I have no idea from where to start Belo
  • ASP.NET MVC - 按角色显示表单值

    我正在寻找一种理想的方式 让我的输入表单根据角色显示为文本框 可编辑 标签 只读 或隐藏 无法访问 我知道我可以对每种类型的角色有不同的看法 但我希望有一些新的优点可以让我不必做 80 次观看 实际上 这完全取决于您想要在哪里设置与安全相关
  • 我无法在 Eclipse 2023-09 (4.29.0) 中运行 JUnit 5.9.x 测试

    Hi I ve start with start spring io just like below and I was trying to check if test is okay or not by default 但这还没有开始 我
  • 如何检查字符串是否是有效的 JSON 字符串?

    isJsonString Id 1 Name Coke 应该true and isJsonString foo isJsonString div foo div 应该false 我正在寻找一个不使用的解决方案try catch因为我将调试器
  • 当使用 ATL 宏处理 DWebBrowserEvents2 时处理 HTMLElementEvents2

    我正在使用 VS2008 C 创建浏览器帮助程序对象 我的类派生自 IDispEventImpl 等 class ATL NO VTABLE CHelloWorldBHO public CComObjectRootEx
  • org.lwjgl.system.Library错误

    我在 Eclipse 中设置了 LWJGL 3 当我尝试运行测试代码时 它给了我这个错误https www lwjgl org guide Exception in thread main java lang NoClassDefFound
  • 学习如何转换数字输入

    第一次提问 我正在学习如何转换数字输入 从 edX 课程学习并使用 jupyter 笔记本 这是我的代码 calc 1 input What is the first number use integers calc 2 input Wha
  • 如何将多个dll包装在一个dll中[重复]

    这个问题在这里已经有答案了 可能的重复 合并 net托管的dll 我创建了一个测试项目 在其中引用了其他项目中的 dll 当我构建测试项目时 我看到测试项目和引用的 dll 不同 我想将两个 dll 包装在一个 dll 中 我该怎么做 看看
  • 如何在矩形上创建和分布对角条纹?

    我希望能够使用 JFreeChart 创建类似于下图的条形图 这是一个非常基本的单色条形图 但有一个 花哨 的细节 对角条纹 我想这可以通过在普通栏上叠加另一张图片来实现 该图片的尺寸与条形图相同 具有对角白色条纹和透明背景 我不太确定如何
  • 如何在pandas中进行“(df1而不是df2)”数据框合并?

    我有 2 个 pandas 数据框 df1 和 df2 具有公共列 键 x y 我想在键 x y 上进行 df1 not df2 合并 这意味着我希望我的代码返回一个包含 x y 行的数据帧 仅在 df1 中而不是在 df2 中 SAS 具
  • Spark:DF.as[Type] 无法编译

    我正在尝试运行 Spark 书中的示例Spark The Definitive Guide build sbt ThisBuild scalaVersion 3 2 1 libraryDependencies Seq org apache