在 Spark 中将带有 MapType 列的 DataFrame 写入数据库

2024-03-02

我正在尝试使用 clickhouse-native-jdbc 驱动程序将带有 MapType 列的数据帧保存到 Clickhouse(架构中也包含地图类型列),并遇到以下错误:

Caused by: java.lang.IllegalArgumentException: Can't translate non-null value for field 74
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16(JdbcUtils.scala:593)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$16$adapted(JdbcUtils.scala:591)

我在spark源代码中找到了这个地方,它包含以下内容:

private def makeSetter(
      conn: Connection,
      dialect: JdbcDialect,
      dataType: DataType): JDBCValueSetter = dataType match {
    case IntegerType =>
      (stmt: PreparedStatement, row: Row, pos: Int) =>
        stmt.setInt(pos + 1, row.getInt(pos))

    case LongType =>
      (stmt: PreparedStatement, row: Row, pos: Int) =>
        stmt.setLong(pos + 1, row.getLong(pos))

...
    case _ =>
      (_: PreparedStatement, _: Row, pos: Int) =>
        throw new IllegalArgumentException(
          s"Can't translate non-null value for field $pos")

该函数匹配列类型,如果没有合适的类型,则会抛出此错误。正如我所看到的,spark 根本无法处理 MapType 列。

我尝试复制和修改org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils文件以使其能够与 MapType 列一起使用,如下所示:

case MapType(_, _, _) =>
    (stmt: PreparedStatement, row: Row, pos: Int) =>
        val map = row.getMap[AnyRef, AnyRef](pos)
        stmt.setObject(pos + 1, mapAsJavaMap(map))

在本地计算机中,它按预期工作,但在集群模式执行器中使用库存版本,而不是我自己的版本。

有谁知道如何让 Spark 以另一种方式使用 MapType 列,或者使用修改后的源代码来创建执行器?


感谢 Danilo Rodrigues 的启发,最后我这样解决了我的问题: 我没有按原样编写 Map 值,而是将其转换为 json 字符串,Clickhouse 中的表架构现在如下所示:

CREATE TABLE t1 (
    param_str String,
    param MATERIALIZED cast((arrayMap(x->x.1, JSONExtractKeysAndValues(param_str, 'String')), arrayMap(x->x.2, JSONExtractKeysAndValues(param_str, 'String'))), 'Map(String, String)')
) Engine ...

是的,它看起来有点难看,我更愿意选择改变 Spark 源代码的方法,但当前的方法效果很好

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark 中将带有 MapType 列的 DataFrame 写入数据库 的相关文章

  • Scala:如何编写将类型化为接收者的实现类型的对象返回的方法

    我知道 Scala 中不推荐使用案例类继承 但为了简单起见 我在以下示例中使用了它 scala gt case class Foo val f String def foo g String Foo this copy f g define
  • 自定义 NIO 文件系统无法通过 SBT 的测试任务加载

    为了进行测试 我使用内存中的 NIOFileSystem执行 memoryfs https github com openCage memoryfs 我以前已经利用过它 并且它似乎运行良好 例如梅文 然而 现在 在SBT项目中 不可能初始化
  • 对于空列表,max() 应该返回什么?

    Got java util NoSuchElementException head of empty list所以我试着检查一下 但现在我明白了 info max of a few numbers FAILED info 0 did not
  • 宏:knownDirectSubclasses 被嵌套类型破坏?

    我有一个宏 它枚举密封特征的直接子类型 import scala reflect macros Context import language experimental macros object Checker def apply A U
  • Scala 中值类的隐式 Json 格式化程序

    我有许多值类组成了一个更大的对象案例类 final case class TopLevel foo Foo bar Bar final case class Foo foo String extends AnyVal final case
  • Scala 相当于 Java 的 Number

    我正在尝试为数值域类型构建类型层次结构 例如AYear is an Int 这是一个Number a Percentage is a Double 这是一个Number等等 我需要层次结构以便我可以调用toInt or toDouble关于
  • 最小重复子串

    我正在看 Perl代码高尔夫页面 http www perlmonks org node id 82878 不要问为什么 并遇到了这个 第 3 洞 最小重复图案 编写一个子例程 它接受一个字符串 该字符串可能包含 重复模式 并返回最小的重复
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 使用 Akka 玩 2.5 - 找不到参数超时的隐式值:akka.util.Timeout

    我正在尝试使用 Play 2 5 测试 Akka 但遇到了一个似乎无法解决的编译错误 我正在关注 Play 文档中的此页面 https playframework com documentation 2 5 x ScalaAkka http
  • Source.getLines 中的默认参数错误 (Scala 2.8.0 RC1)

    假设我运行 Scala 2 8 0 RC1 以下 scala 代码应该打印出文件 c hello txt 的内容 for line lt Source fromPath c hello txt getLines println line 但
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • Scala:类似 Option (Some, None) 但具有三种状态:Some、None、Unknown

    我需要返回值 当有人询问值时 告诉他们以下三件事之一 这是值 没有价值 我们没有关于该值的信息 未知 情况 2 与情况 3 略有不同 示例 val radio car radioType 我们知道该值 返回无线电类型 例如 pioneer
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • 使用 Spark DataFrame 获取组后所有组的 TopN

    我有一个 Spark SQL DataFrame user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4 如何按用户分组然后返回TopN
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 将 SQL 数据中的一行映射到 Java 对象

    我有一个 Java 类 其实例字段 以及匹配的 setter 方法 与 SQL 数据库表的列名相匹配 我想优雅地从表中获取一行 到 ResultSet 中 并将其映射到此类的实例 例如 我有一个 Student 类 其中包含实例字段 FNA
  • Scala 和变量中的模式匹配

    我是 Scala 新手 有点想知道模式匹配是如何工作的 想象一下我有以下内容 case class Cls i Int case b Cls i gt Ok case e Cls gt Ok case f Cls gt Ok case s
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • 运行具有外部依赖项的 Scala 脚本

    我在 Users joe scala lib 下有以下 jar commons codec 1 4 jar httpclient 4 1 1 jar httpcore 4 1 jar commons logging 1 1 1 jar ht
  • 对 Scala Not Null 特征的库支持

    Notice 从 Scala 2 11 开始 NotNull已弃用 据我了解 如果您希望引用类型不可为空 则必须混合魔法NotNull特征 编译器会自动阻止你输入null 可以值在里面 看到这个邮件列表线程 http www nabble

随机推荐