Spark SQL - 从 sql 函数生成数组的数组

2024-03-26

我想创建一个数组的数组。这是我的数据表:

// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)

// Create an RDD with some data
val x = sc.parallelize(Array(
    Testing(null, 21, 905),
    Testing("Noelia", 26, 1130),
    Testing("Pilar", 52,  1890),
    Testing("Roberto", 31, 1450)
 ))

// Convert RDD to a DataFrame 
val df = sqlContext.createDataFrame(x) 

// For SQL usage we need to register the table
df.registerTempTable("df")

我想创建一个整数列“age”的数组。为此,我使用“collect_list”:

sqlContext.sql("SELECT collect_list(age) as age from df").show

但现在我想生成一个包含上面创建的多个数组的数组:

 sqlContext.sql("SELECT collect_list(collect_list(age), collect_list(salary)) as arrayInt from df").show

但这不行,还是用org.apache.spark.sql.functions.array这个函数。有任何想法吗?


好吧,事情再简单不过了。让我们考虑一下您正在处理的相同数据,并从那里逐步进行

// A case class for our sample table
case class Testing(name: String, age: Int, salary: Int)

// Create an RDD with some data
val x = sc.parallelize(Array(
  Testing(null, 21, 905),
  Testing("Noelia", 26, 1130),
  Testing("Pilar", 52, 1890),
  Testing("Roberto", 31, 1450)
))

// Convert RDD to a DataFrame
val df = sqlContext.createDataFrame(x)

// For SQL usage we need to register the table
df.registerTempTable("df")
sqlContext.sql("select collect_list(age) as age from df").show

// +----------------+
// |             age|
// +----------------+
// |[21, 26, 52, 31]|
// +----------------+

sqlContext.sql("select collect_list(collect_list(age),     collect_list(salary)) as arrayInt from df").show

正如错误消息所示:

org.apache.spark.sql.AnalysisException: No handler for Hive udf class
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Exactly one argument is expected..; line 1 pos 52 [...]

collest_list仅需要一个参数。让我们检查一下文档here http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24.

它实际上需要一个参数!但让我们进一步了解函数对象的文档。您似乎已经注意到,数组函数允许您从 Column 或重复的 Column 参数创建新的数组列。那么让我们使用它:

sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df").show(false)

数组函数确实从由collect_list预先创建的列列表中创建了一个列,包括年龄和薪水:

// +-------------------------------------------------------------------+
// |arrayInt                                                           |
// +-------------------------------------------------------------------+
// |[WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450)]|
// +-------------------------------------------------------------------+

我们接下来该去哪里?

您必须记住,DataFrame 中的 Row 只是由 Row 包装的另一个集合。

我要做的第一件事就是处理该系列。那么我们如何展平WrappedArray[WrappedArray[Int]] ?

Scala 有点神奇,你只需要使用.flatten

import scala.collection.mutable.WrappedArray

val firstRow: mutable.WrappedArray[mutable.WrappedArray[Int]] =
  sqlContext.sql("select array(collect_list(age), collect_list(salary)) as arrayInt from df")
    .first.get(0).asInstanceOf[WrappedArray[WrappedArray[Int]]]
// res26: scala.collection.mutable.WrappedArray[scala.collection.mutable.WrappedArray[Int]] =
// WrappedArray(WrappedArray(21, 26, 52, 31), WrappedArray(905, 1130, 1890, 1450))

firstRow.flatten
// res27: scala.collection.mutable.IndexedSeq[Int] = ArrayBuffer(21, 26, 52, 31, 905, 1130, 1890, 1450)

现在让我们将其包装在 UDF 中,以便我们可以在 DataFrame 上使用它:

def flatten(array: WrappedArray[WrappedArray[Int]]) = array.flatten
sqlContext.udf.register("flatten", flatten(_: WrappedArray[WrappedArray[Int]]))

由于我们注册了 UDF,我们现在可以在 sqlContext 中使用它:

sqlContext.sql("select flatten(array(collect_list(age), collect_list(salary))) as arrayInt from df").show(false)

// +---------------------------------------+
// |arrayInt                               |
// +---------------------------------------+
// |[21, 26, 52, 31, 905, 1130, 1890, 1450]|
// +---------------------------------------+

我希望这有帮助 !

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark SQL - 从 sql 函数生成数组的数组 的相关文章

  • 如何在 Scala 中打印任何内容的列表?

    目前我有一个打印整数的方法 def printList args List Int Unit args foreach println 我如何修改它 使其足够灵活 可以打印任何内容的列表 您不需要专用的方法 所需的功能已经在集合类中 pri
  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • Spark的distinct()函数是否仅对每个分区中的不同元组进行洗牌

    据我了解 distinct 哈希分区 RDD 来识别唯一键 但它是否针对仅移动每个分区的不同元组进行了优化 想象一个具有以下分区的 RDD 1 2 2 1 4 2 2 1 3 3 5 4 5 5 5 在此 RDD 上的不同键上 所有重复键
  • 使用 scala 在 Flink 中进行实时流预测

    弗林克版本 1 2 0斯卡拉版本 2 11 8 我想使用 DataStream 来使用 scala 中的 flink 模型进行预测 我在使用 scala 的 flink 中有一个 DataStream String 其中包含来自 kafka
  • 玩:将表单字段绑定到双精度型?

    也许我只是忽略了一些明显的事情 但我无法弄清楚如何将表单字段绑定到 Play 控制器中的双精度型 例如 假设这是我的模型 case class SavingsGoal timeframeInMonths Option Int amount
  • Scala 解析器组合器的运算符优先级

    我正在研究需要考虑运算符优先级的解析逻辑 我的需求并不太复杂 首先 我需要乘法和除法比加法和减法具有更高的优先级 例如 1 2 3 应视为 1 2 3 这是一个简单的例子 但你明白了 我需要将更多自定义标记添加到优先级逻辑中 我可以根据此处
  • 在 IntelliJ 中运行 Spark 字数统计

    我花了几个小时浏览 You Tube 视频和教程 试图了解如何在 Scala 中运行 Spark 字数统计程序 并将其转换为 jar 文件 我现在完全糊涂了 我运行了 Hello World 并且了解了如何在 Apache spark sp
  • scala play框架如何对异步控制器进行单元测试

    使用 Scala play 2 5 版并尝试遵循以下文档中的单元测试控制器指南 https www playframework com documentation 2 5 x ScalaTestingWithScalaTest https
  • 行类型 Spark 数据集的编码器

    我想写一个编码器Row https spark apache org docs 2 0 0 api java index html org apache spark sql Row html输入 DataSet 用于我正在执行的地图操作 本
  • Spark 执行器登录 YARN

    我正在 Cloudera 集群上以 YARN 客户端模式启动分布式 Spark 应用程序 一段时间后 我在 Cloudera Manager 上看到一些错误 一些执行者会断开连接 并且这种情况会系统性地发生 我想调试该问题 但 YARN 未
  • 使用 Apache Spark 读取 JSON - `corrupt_record`

    我有一个json file nodes看起来像这样 toid osgb4000000031043205 point 508180 748 195333 973 index 1 toid osgb4000000031043206 point
  • 如何使用 log4j 自定义附加程序在 HDFS 上创建日志?

    Overview 我们希望使用 log4j 记录 Spark 作业活动 并将日志文件写入 HDFS Java 8 Spark 2 4 6 Scala 2 1 2 Hadoop 3 2 1 我们无法找到本地 apache log4j 附加程序
  • 向 Scala Swing Panel 添加标签时出现类型不匹配错误

    我有这个课程扩展FlowPanel我正在尝试向其中添加标签 import java awt Label Color import scala swing import scala util Random class MyPanel exte
  • 在 Scala 中调用 WebSocket 中的方法

    我是 scala Play 框架和 Akka 的新手 我的函数定义为 def socket WebSocket accept String String request gt ActorFlow actorRef out gt MyWebS
  • 如何定义与更高类型类型(类型构造函数)绑定的上下文

    我尝试过以下方法 def test Option T Ordering value1 Option T value2 Option T val e implicitly Ordering Option T compare value1 va
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 使用 Scala 进行网页抓取 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 如何在SparkR中进行map和reduce

    如何使用 SparkR 进行映射和归约操作 我能找到的只是有关 SQL 查询的内容 有没有办法使用 SQL 进行映射和减少 See 写入从 SparkR map 返回的 R 数据帧 https stackoverflow com quest
  • scala中的协变类型参数需要在java接口中保持不变

    我有一个看起来像这样的特征 一些进一步的信息可以在我自己提出了这个相关问题 https stackoverflow com questions 3695990 inheritance and automatic type conversio

随机推荐

  • mongodb show dbs list数据库失败

    我是 mongodb 的新手 我刚刚在我的MAC上安装了mongoDB 看完这个 YouTube 视频后 在 mongo shell 中 我输入 show dbs 并得到有线输出 请帮助我理解并解决这个问题 gt show dbs 2017
  • 在 JDBC 中插入单引号以进行 SQL 查询不起作用

    我在通过 Oracle JDBC 在 JAVA 的准备好的语句中使用单引号时遇到了处理单引号的问题 假设我们有一张 Restaurant 表 其中一列 Restaurant name 的值为 1 Jack s Deli 我想使用一个简单的准
  • 使用另一个对话框的功能更改 jQuery-UI 对话框的标题

    为什么第二个 jQuery UI 对话框标题在弹出时不改变 第一个对话框我使用以下命令更改框的标题 attr title Confirm 它将第一个框的标题更改为 确认 就像它应该有的那样 现在 当第二个框弹出时 它应该将标题更改为 消息
  • 将 std::hash 专门化为依赖类型

    我已经定义了这个模板类结构 template
  • 尝试获取已安装应用程序列表时出现 TransactionTooLargeException

    作为我的应用程序的一部分 我通过使用 ApplicationPackageManager getInstalledApplications 获取设备上安装的应用程序列表 但对于某些用户 我收到崩溃报告说 android osBinderPr
  • 视图漂浮在所有 ViewController 之上

    在 iOS 上 视图是否可能始终漂浮在所有其他视图之上 我问这个是因为我想要实现的是一个漂浮在 ViewController 之上的视图 然后一个模态视图控制器滑入 同时该特定视图仍然漂浮在该模态视图控制器上 希望你明白我想说的 有 您可以
  • 程序员多久会被要求编写一个 makefile 文件? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 从当前文化中获取货币?

    有没有办法从应用程序文化设置动态获取当前信息 基本上 如果用户将文化设置为美国 我想知道货币是美元 或者如果他们将其设置为英国 我想知道英镑等 等等 这样我就可以在付款时将此信息发送给 PayPal 使用 RegionInfo ISOCur
  • 根据当前值更新 MongoDB 中的值

    我想做这样的事情 但是this关键字似乎没有在更新语句中设置 db items update foo set bar this foo false true 我必须使用eval来完成这个 是的 您不能引用修饰符中的其他字段 您必须使用 db
  • 使 MongoDB 中的表字段可文本搜索

    先决条件 已使用集合创建数据库posts它的架构如下 module exports function mongoose var Schema mongoose Schema var postSchema new Schema postID
  • Java TA-Lib 文档 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找有关的文档TA Lib http www ta lib org index html在爪哇
  • 使用 SWIG 和 Python/C API 包装返回 std::map 的函数

    我想包装一个 C 例程 它返回一个std map整数和指向 C 类实例的指针 我在使用 SWIG 时遇到困难 希望能提供任何帮助 我试图通过一个简单的例子来将这个问题归结为它的本质 标题test h定义如下 File test h incl
  • 计算单元测试运行期间发生的GC数量[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我目前正在编写一个单元测试来查看给定方法的性能影响 从实践中我们观察到 当前在给定方法的执行过程中发生了很多GC 我想知道是否可以查看从
  • ASIHTTPRequest dealloc 和 EXC_BAD_ACCESS 问题

    我使用一组 ASIHTTPRequest 包装器 AsyncImageLoader 来下载 UITableView 中单元格的图像 我在处理 ASIHTTPRequests 生命周期时遇到问题 如果我释放它们 如果我在它们尝试加载图像时继续
  • 警告:require_once():http:// 包装器在服务器配置中被allow_url_include=0 禁用

    我试图通过以下方式在页面中包含 php 文件 require once http localhost web a php 我收到错误 Warning require once http wrapper is disabled in the
  • Jodatime的LocalDateTime第一次使用时很慢

    我目前正在一个 java 项目中测试一些 webapp 技术 并且想知道为什么页面有时加载速度很快 有时需要近 5 秒才能加载 我终于发现是这条线 LocalDateTime now new LocalDateTime 第一次调用时 需要很
  • 使用 par 时图例框宽度不正确

    我有问题 我的图例太大 我的代码 par mfrow c 1 2 hist alvsloss breaks 100 freq F main Histogramm density curve gaussian kernel n and fit
  • Dart - 试图理解“工厂”构造函数的价值

    如果我理解正确的话 A factory constructor affords an abstract class to be instantiated by another class despite being abstract 例如
  • 仅调用一个 Paint 事件

    我的问题是我有 8 个图片框 但一次只有其中一个调用其绘制方法 我的代码有点太大 所以我尝试尽可能地将其范围缩小到受影响的部分 我最好的猜测是 这并不是我的代码中的错误 而是对绘制事件如何工作的误解 我有一个继承自 PictureBox 的
  • Spark SQL - 从 sql 函数生成数组的数组

    我想创建一个数组的数组 这是我的数据表 A case class for our sample table case class Testing name String age Int salary Int Create an RDD wi