如何从 Scala 方法创建 UDF(计算 md5)?

2024-01-07

我想从两个已经工作的函数构建一个 UDF。我正在尝试计算 md5 哈希作为现有 Spark Dataframe 的新列。

def md5(s: String): String = { toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))}
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")

结构(到目前为止我所拥有的)

val md5_hash: // UDF Implementation
val sqlfunc = udf(md5_hash)
val new_df = load_df.withColumn("New_MD5_Column", sqlfunc(col("Duration")))

不幸的是我不知道如何正确地实现该函数作为 UDF。


为什么不使用内置的md5 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24@md5(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column功能?

md5(e: 列): 列计算二进制列的 MD5 摘要并以 32 个字符的十六进制字符串形式返回值。

然后您可以按如下方式使用它:

val new_df = load_df.withColumn("New_MD5_Column", md5($"Duration"))

您必须确保该列是二进制类型,因此如果它是 int,您可能会看到以下错误:

org.apache.spark.sql.AnalysisException:无法解析'md5(Duration)' 由于数据类型不匹配:参数 1 需要二进制类型,但是,'Duration' 是 int 类型。;;

然后您应该将类​​型更改为md5-兼容,即二进制类型,使用bin http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24@bin(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column功能。

bin(e: 列): 列返回给定长列的二进制值的字符串表示形式的表达式。例如,bin("12")回报"1100".

那么解决方案可能如下:

val solution = load_df.
  withColumn("bin_duration", bin($"duration")).
  withColumn("md5", md5($"bin_duration"))
scala> solution.show(false)
+--------+------------+--------------------------------+
|Duration|bin_duration|md5                             |
+--------+------------+--------------------------------+
|1       |1           |c4ca4238a0b923820dcc509a6f75849b|
+--------+------------+--------------------------------+

您还可以将函数“链接”在一起,并在一个函数中进行转换和计算 MD5withColumn,但我更喜欢将步骤分开,以防出现需要解决的问题,并且中间步骤通常会有所帮助。

表现

您会考虑使用内置函数的原因bin and md5自定义用户定义函数 (UDF) 的优点是could由于 Spark SQL 处于完全控制状态,因此可以获得更好的性能would不添加额外的步骤来序列化和反序列化内部行表示。

这里的情况并非如此,但仍然需要较少的导入和使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Scala 方法创建 UDF(计算 md5)? 的相关文章

  • 了解如何使用 apply 和 unappy

    我试图更好地理解 的正确用法apply and unapply方法 考虑到我们想要序列化和反序列化的对象 这是正确的用法吗 即斯卡拉方式 的使用apply and unapply case class Foo object Foo appl
  • 哪些 ORM 与 Scala 配合得很好? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • Spark RDD默认分区数

    版本 Spark 1 6 2 Scala 2 10 我正在执行以下命令spark shell 我试图查看 Spark 默认创建的分区数量 val rdd1 sc parallelize 1 to 10 println rdd1 getNum
  • AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

    这个命令运行良好有什么原因吗 sql SELECT FROM Azure Reservations WHERE timestamp gt 2021 04 02 返回 2 行 如下 sql DELETE FROM Azure Reservat
  • Spark 和 Ipython 中将非数字特征编码为数字的问题

    我正在做一些我必须做出预测的事情numeric数据 每月员工支出 使用non numeric特征 我在用Spark MLlibs Random Forests algorthim 我有我的features数据在一个dataframe看起来像
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 将 Scala 库转换为 DLL (.NET)

    我正在尝试从 scala 类创建一个 Dll 我将 IntelliJ 与 SBT 一起使用 我已经找到了一种使用 ikvm converter 将 jar 文件转换为 Dll 的方法 现在的问题是 当我在 SBT 下使用 package 从
  • 逆变方法参数类型

    wiki 逆变方法参数类型 https en wikipedia org wiki Covariance and contravariance 28computer science 29 Contravariant method argum
  • Scala 中值类的隐式 Json 格式化程序

    我有许多值类组成了一个更大的对象案例类 final case class TopLevel foo Foo bar Bar final case class Foo foo String extends AnyVal final case
  • 如何从 SparkSQL DataFrame 中的 MapType 列获取键和值

    我的镶木地板文件中有数据 该文件有 2 个字段 object id String and alpha Map lt gt 它被读入 SparkSQL 中的数据帧 其架构如下所示 scala gt alphaDF printSchema ro
  • 阶乘的 Scala 排列

    我怎样才能找到n Scala 中某些字母的排列 Scala 2 9 RC1 scala gt abc permutations toList res58 List String List abc acb bac bca cab cba
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • 如何在 Scala 中打印任何内容的列表?

    目前我有一个打印整数的方法 def printList args List Int Unit args foreach println 我如何修改它 使其足够灵活 可以打印任何内容的列表 您不需要专用的方法 所需的功能已经在集合类中 pri
  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala
  • Spark 结构化流中具有不同计数的聚合抛出错误

    我正在尝试在 Spark 结构化流中获取 Parentgroup childgroup 和 MountingType 组的唯一 id 代码 下面的代码抛出错误 withWatermark timestamp 1 minutes val ag
  • Play Framework 2.3 (Scala) 中的自定义 JSON 验证约束

    我设法使用自定义约束实现表单验证 但现在我想对 JSON 数据执行相同的操作 如何将自定义验证规则应用于 JSON 解析器 示例 客户端的 POST 请求包含用户名 username 我不仅要确保该参数是非空文本 而且还要确保该用户确实存在
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • Scala Tuple2Zipped 与 IterableLike zip

    两种实现有什么区别 这个比那个好吗 有一篇博客文章说 Tuple2Zipped 性能更好 但没有提供原因 并且查看源代码我没有看到差异 val l1 List 1 2 3 val l2 List 5 6 7 val v1 l1 zip l2
  • Spark Scala 将列从一个数据帧复制到另一个数据帧

    我有一个原始数据框的修改版本 我在其上进行了聚类 现在我想将预测列恢复为原始 DF 索引没问题 因此匹配 我该怎么做 使用这段代码我得到一个错误 println Predicted dfWithOutput show println Ori

随机推荐