将元组列表作为参数传递给 scala 中的 Spark udf

2023-12-20

我正在尝试将元组列表传递给 scala 中的 udf。我不确定如何为此准确定义数据类型。我试图将其作为整行传递,但它无法真正解决它。我需要根据元组的第一个元素对列表进行排序,然后发回 n 个元素。我已经尝试过以下 udf 定义

def udfFilterPath = udf((id: Long, idList: Array[structType[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Array[Tuple2[Long, String]] )

def udfFilterPath = udf((id: Long, idList: Row)

idList 如下所示:

[[1234,"Tony"], [2345, "Angela"]]
[[1234,"Tony"], [234545, "Ruby"], [353445, "Ria"]]

这是一个像上面一样有 100 行的数据框。我这样称呼udf:

testSet.select("id", "idList").withColumn("result", udfFilterPath($"id", $"idList")).show

当我打印数据帧的架构时,它会将其读取为结构数组。 idList 本身是通过对按键分组并存储在数据帧中的一列元组进行收集列表来生成的。关于我做错了什么有什么想法吗?谢谢!


定义 UDF 时,您应该使用普通的 Scala 类型(例如元组、基元...)并且notSpark SQL 类型(例如StructType)作为输出类型.

至于input类型 - 这就是它变得棘手的地方(并且没有太多记录) - 元组数组实际上是一个mutable.WrappedArray[Row]。所以 - 你必须“转变”首先将每一行放入一个元组中,然后您可以进行排序并返回结果。

最后,根据你的描述,似乎id根本不使用列,因此我将其从 UDF 定义中删除,但可以轻松将其添加回来。

val udfFilterPath = udf { idList: mutable.WrappedArray[Row] =>
  // converts the array items into tuples, sorts by first item and returns first two tuples:
  idList.map(r => (r.getAs[Long](0), r.getAs[String](1))).sortBy(_._1).take(2)
}

df.withColumn("result", udfFilterPath($"idList")).show(false)

+------+-------------------------------------------+----------------------------+
|id    |idList                                     |result                      |
+------+-------------------------------------------+----------------------------+
|1234  |[[1234,Tony], [2345,Angela]]               |[[1234,Tony], [2345,Angela]]|
|234545|[[1234,Tony], [2345454,Ruby], [353445,Ria]]|[[1234,Tony], [353445,Ria]] |
+------+-------------------------------------------+----------------------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将元组列表作为参数传递给 scala 中的 Spark udf 的相关文章

  • Scala 组合器解析器 - 区分数字字符串和变量字符串

    我正在做 Cay Horstmann 的组合器解析器练习 我想知道区分代表数字的字符串和代表匹配语句中变量的字符串的最佳方法 def factor Parser ExprTree wholeNumber expr ident case a
  • Spark SQL 广播提示中间表

    我在使用广播提示时遇到问题 可能是缺乏 SQL 知识 我有一个查询 例如 SELECT broadcast a FROM a INNER JOIN b ON INNER JOIN c on 我想要做 SELECT broadcast a F
  • 了解如何使用 apply 和 unappy

    我试图更好地理解 的正确用法apply and unapply方法 考虑到我们想要序列化和反序列化的对象 这是正确的用法吗 即斯卡拉方式 的使用apply and unapply case class Foo object Foo appl
  • Scala:如何编写将类型化为接收者的实现类型的对象返回的方法

    我知道 Scala 中不推荐使用案例类继承 但为了简单起见 我在以下示例中使用了它 scala gt case class Foo val f String def foo g String Foo this copy f g define
  • AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

    这个命令运行良好有什么原因吗 sql SELECT FROM Azure Reservations WHERE timestamp gt 2021 04 02 返回 2 行 如下 sql DELETE FROM Azure Reservat
  • Spark/Yarn:HDFS 上不存在文件

    我在 AWS 上设置了 Hadoop Yarn 集群 有 1 个主服务器和 3 个从服务器 我已经验证我有 3 个活动节点在端口 50070 和 8088 上运行 我在客户端部署模式下测试了 Spark 作业 一切正常 当我尝试使用 Spa
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 如何使用 apply/unapply 方法重现案例类行为?

    我尝试用普通类和伴生对象替换案例类 但突然出现类型错误 编译良好的代码 综合示例 trait Elem A B def C other Elem C A Elem C B other match case Chain head tail g
  • Java / Scala Future 由回调驱动

    简洁版本 我怎样才能创建一个Promise
  • 将 Scala 库转换为 DLL (.NET)

    我正在尝试从 scala 类创建一个 Dll 我将 IntelliJ 与 SBT 一起使用 我已经找到了一种使用 ikvm converter 将 jar 文件转换为 Dll 的方法 现在的问题是 当我在 SBT 下使用 package 从
  • Scala 相当于 Java 的 Number

    我正在尝试为数值域类型构建类型层次结构 例如AYear is an Int 这是一个Number a Percentage is a Double 这是一个Number等等 我需要层次结构以便我可以调用toInt or toDouble关于
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • 如何在 Scala 中打印任何内容的列表?

    目前我有一个打印整数的方法 def printList args List Int Unit args foreach println 我如何修改它 使其足够灵活 可以打印任何内容的列表 您不需要专用的方法 所需的功能已经在集合类中 pri
  • 使用 Spark DataFrame 获取组后所有组的 TopN

    我有一个 Spark SQL DataFrame user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4 如何按用户分组然后返回TopN
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过
  • Play Framework 2.3 (Scala) 中的自定义 JSON 验证约束

    我设法使用自定义约束实现表单验证 但现在我想对 JSON 数据执行相同的操作 如何将自定义验证规则应用于 JSON 解析器 示例 客户端的 POST 请求包含用户名 username 我不仅要确保该参数是非空文本 而且还要确保该用户确实存在
  • Scala 模式匹配变量绑定

    为什么提取器返回时不能以 样式绑定变量Option
  • 使用spark phoenix从表中读取rdd分区号为1

    当我运行我的火花代码时 val sqlContext spark sqlContext val noact table primaryDataProcessor getTableData sqlContext zookeeper table

随机推荐

  • 从 SQL 连接到 Web 服务

    SQL Server 能够使用数据提供程序 例如 JET ACE 和 OPENROWSET 打开 Excel 工作表 xlsx 访问数据库 mdb 和其他数据流 是否有类似的工具可以从远程 Web 服务中提取数据 使用 OPENROWSET
  • MySql IEEE 浮点 NaN、PositiveInfinity、NegativeInfinity

    我已经看过很多关于这个问题的问题 但我还没有找到解决方案 希望这不是一个重复的问题 Problem 如果我执行以下任一操作 INSERT INTO Numbers Number VALUES NaN INSERT INTO Numbers
  • CORS节点js问题

    在浏览了堆栈上的多个帖子后 我仍然找不到正确的答案 检查了文档CORS https github com expressjs cors user content simple usage enable all cors requests扩展
  • WPF。如何通过绑定停止数据触发动画?

    在 WPF 工具包数据网格中 我有一个绑定到单元格元素不透明度的数据触发器 When UpVisibility更改为 1 路径变得可见 并且动画开始将其淡化为 0 这有效 然而我现在的问题 如果我需要提前停止 取消褪色并设置UpVisibi
  • 如何在 Docker 上运行 .exe 文件?

    我目前正在尝试了解和学习Docker 我有一个应用程序 exe 文件 我想通过创建 Docker 在 Linux 或 OSX 上运行它 我在网上搜索过 但找不到任何可以做到这一点的东西 而且我对 Docker 还不够了解 无法尝试即兴创作一
  • 格式化具有多个百分号的字符串

    I know 用于逃避实际 字符串中的符号 所以 ds最终会是 10s在以下格式字符串中 但我不知道为什么我需要 5s在这个字符串中 毕竟 只有两个附加参数 BUFFSIZE 10 define BUFFSIZE 100 char buf
  • sapply 与复合函数的速度比较

    gt system time sapply rnorm 1000000 0 1 function x round x 2 user system elapsed 2 78 0 11 2 89 gt system time round rno
  • cref 不在对象浏览器中创建链接

    我在 C 2010 类库中有以下代码
  • 如何在css中定义多个类的hover事件?

    在CSS中 如何定义多个类的悬停事件以使用相同的属性 这似乎不起作用 my div hover my td hover border 1px solid red Thanks 您应该用逗号分隔 如下所示 my div hover my td
  • 以编程方式更改 ActionBar 选项卡下划线颜色

    我已经创建了操作栏 ActionBar actionbar getActionBar 操作栏的背景更改为 actionbar setBackgroundDrawable actionBarBackgroundImage 现在我需要以编程方式
  • Python,日志记录:使用带有字典配置的自定义处理程序?

    这是关于 Python 3 2 GNU Linux x86 64 上的日志记录模块 是否可以使用字典配置设置自定义处理程序 这是我正在尝试的代码 import logging import logging config class Cust
  • Android:首选项屏幕从右到左

    这是PreferenceScreen的xml文件 PreferenceCategory 根据我的需要显示为 RTL 但其他组件显示为 LTR 如何让他们RTL 我必须遵循这个link http android developers blog
  • 从 pyodbc 调用过程时出错

    这是我的第一个问题 所以 如果重复或格式错误 我很抱歉 我搜索了其他问题 发现该错误很常见 但出现在多种情况下 我有一个非常简单的 python 代码 我想在 MSSQL 中从 pyodbc 执行一个过程 import pyodbc con
  • tkinter 显示当前标签,删除前一个[重复]

    这个问题在这里已经有答案了 我正在尝试一个程序 它将显示在输入框中输入的相应名称的标签 问题 它重叠并显示标签 而不是消失以前的条目标签 我的编码 import Tkinter as tki class App object def ini
  • Backbone.js Underscore.js 过滤集合与数组

    我基本上正在尝试解决这个问题 但是使用数组并使用相应数组的值返回所有对象 而不仅仅是值 按属性值过滤骨干集合 https stackoverflow com questions 11762105 filter backbone collec
  • 推荐的元元素?

    为我的网站项目建立一个 基本框架 我想知道哪些元元素是真正必要 推荐的 我特别想知道如何处理语言属性 在下面的例子中 我认为 不必要地重复
  • gradle 构建因未知主机异常而失败

    我刚刚安装了新版本的 Android Studio 并尝试使用内置模板创建一个项目 我的环境是 Windows 7 SP1 64 位 在安全域环境中 我是我的计算机上的域管理员 这是我所看到的 C Users stuz AndroidStu
  • 术语“更新数据库”不被识别为 cmdlet、函数、脚本文件或可操作程序的名称。检查[重复]

    这个问题在这里已经有答案了 我正在使用 VS 2015 Community Update 3 当我尝试使用命令重新创建 EF 驱动的数据库时update database在包管理器控制台中 显示错误 术语 更新数据库 不被识别为 cmdle
  • sequelize postgres 将 fn 'date' 与 jsonb 值结合起来

    我需要比较格式字符串MM DD YYYY与postgres中的另一个使用sequelize在jsonb列上 在常规专栏上我会做类似的事情 sequelize where sequelize fn date sequelize col cre
  • 将元组列表作为参数传递给 scala 中的 Spark udf

    我正在尝试将元组列表传递给 scala 中的 udf 我不确定如何为此准确定义数据类型 我试图将其作为整行传递 但它无法真正解决它 我需要根据元组的第一个元素对列表进行排序 然后发回 n 个元素 我已经尝试过以下 udf 定义 def ud