计算行的排名

2024-05-07

我想根据一个字段对用户 ID 进行排名。对于相同的字段值,排名应该相同。该数据位于 Hive 表中。

e.g.

user value
a       5
b       10
c       5
d       6

Rank
a - 1
c - 1
d - 3
b - 4

我怎样才能做到这一点?


可以使用rank使用 DataFrame API 的窗口函数:

import org.apache.spark.sql.functions.rank
import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"value")

val df = sc.parallelize(Seq(
  ("a", 5), ("b", 10), ("c", 5), ("d", 6)
)).toDF("user", "value")

df.select($"user", rank.over(w).alias("rank")).show

// +----+----+
// |user|rank|
// +----+----+
// |   a|   1|
// |   c|   1|
// |   d|   3|
// |   b|   4|
// +----+----+

或原始 SQL:

df.registerTempTable("df")
sqlContext.sql("SELECT user, RANK() OVER (ORDER BY value) AS rank FROM df").show

// +----+----+
// |user|rank|
// +----+----+
// |   a|   1|
// |   c|   1|
// |   d|   3|
// |   b|   4|
// +----+----+

但效率极低。

您还可以尝试使用 RDD API,但它并不那么简单。首先让我们将 DataFrame 转换为 RDD:

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.RangePartitioner

val rdd: RDD[(Int, String)] = df.select($"value", $"user")
  .map{ case Row(value: Int, user: String) => (value, user) }

val partitioner = new RangePartitioner(rdd.partitions.size,  rdd)
val sorted =  rdd.repartitionAndSortWithinPartitions(partitioner)

接下来我们必须计算每个分区的排名:

def rank(iter: Iterator[(Int,String)]) =  {
  val zero = List((-1L, Integer.MIN_VALUE, "", 1L))

  def f(acc: List[(Long,Int,String,Long)], x: (Int, String)) = 
    (acc.head, x) match {
      case (
        (prevRank: Long, prevValue: Int, _, offset: Long),
        (currValue: Int, label: String)) => {
      val newRank = if (prevValue == currValue) prevRank else prevRank + offset
      val newOffset = if (prevValue == currValue) offset + 1L else 1L
      (newRank, currValue, label, newOffset) :: acc
    }
  }

  iter.foldLeft(zero)(f).reverse.drop(1).map{case (rank, _, label, _) =>
    (rank, label)}.toIterator
}


val partRanks = sorted.mapPartitions(rank)

每个分区的偏移量

def getOffsets(sorted: RDD[(Int, String)]) = sorted
  .mapPartitionsWithIndex((i: Int, iter: Iterator[(Int, String)]) => 
    Iterator((i, iter.size)))
  .collect
  .foldLeft(List((-1, 0)))((acc: List[(Int, Int)], x: (Int, Int)) => 
    (x._1, x._2 + acc.head._2) :: acc)
  .toMap

val offsets = sc.broadcast(getOffsets(sorted))

以及最终排名:

def adjust(i: Int, iter: Iterator[(Long, String)]) = 
  iter.map{case (rank, label) => (rank + offsets.value(i - 1).toLong, label)}

val ranks = partRanks
  .mapPartitionsWithIndex(adjust)
  .map{case (i, label) => (1 + i , label)}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

计算行的排名 的相关文章

  • 使用 tidyverse 在 tibble 中“取消嵌套” data.frame 列

    我正在处理从 www 调用返回的一些数据jsonlite and as tibble以某种方式转换成data frame column This result数据有一个Id整数列和ActionCodedata frame 列有两个内部列 这
  • 闪亮错误:参数暗示行数不同

    我正在尝试开发一个简单的应用程序 从 Kijiji 网站获取本地分类广告 我用几乎相同的脚本制作了一个类似的应用程序 但我没有收到下面描述的错误 所以我不知道这个脚本出了什么问题 我尝试了我能想到的一切 但无法让它发挥作用 的结构df数据框
  • Apache Impala 中是否有相当于 Hive 的“爆炸”功能的函数?

    Hive的函数explode是记录在这里 https cwiki apache org confluence display Hive LanguageManual UDF LanguageManualUDF Built inTable G
  • 如何发现 Scala 远程 Actor 已死亡?

    在 Scala 中 当另一个 远程 actor 终止时 可以通过设置 trapExit 标志并以第二个 actor 作为参数调用 link 方法来通知一个 actor 在这种情况下 当远程参与者通过调用 exit 结束其工作时 第一个参与者
  • Pandas DataFrame:如果列为空,则复制列的内容

    我有以下带有命名列和索引的 DataFrame a a b b 1 5 NaN 9 NaN 2 NaN 3 3 NaN 3 4 NaN 1 NaN 4 NaN 9 NaN 7 数据源导致某些列标题的复制方式略有不同 例如 如上所述 某些列标
  • Scala 如何忽略 Java 的检查异常?

    例如如果调用 JavaThread sleep这会抛出一个已检查的InterruptedException来自 Scala 源文件 然后不需要将调用包含在 Scala 中try catch Scala 如何删除将调用包围在 a 中的规则tr
  • 不支持的身份验证令牌,仅当禁用身份验证时才允许 schema='none':{ schema='none' } - Neo4j 身份验证错误

    我正在尝试使用 neo4j spark connector 从 Spark 连接到 Neo4j 当我尝试连接到 Neo4j 时遇到身份验证问题org neo4j driver v1 exceptions AuthenticationExce
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • 如何在 Scala 中打印任何内容的列表?

    目前我有一个打印整数的方法 def printList args List Int Unit args foreach println 我如何修改它 使其足够灵活 可以打印任何内容的列表 您不需要专用的方法 所需的功能已经在集合类中 pri
  • 使用 Spark DataFrame 获取组后所有组的 TopN

    我有一个 Spark SQL DataFrame user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4 如何按用户分组然后返回TopN
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 错误:协变类型 A 出现在逆变位置

    我试图写一个不可变的Matrix A 班级 我希望该类是协变的A但是当我把 在 前面A编译器开始抱怨类中的某些操作 以下是我的相关子集Matrix类 实际类比以下子集大 5 倍左右 class Matrix A private val co
  • 如何用 pandas 中两个日期之间计算的值填充列?

    我有这个数据框 Date Position TrainerID Win 2017 09 03 4 1788 0 0 wins 1 race 2017 09 16 5 1788 0 0 wins 2 races 2017 10 14 1 17
  • pyspark 中的 Pandas UDF

    我正在尝试在 Spark 数据帧上填充一系列观察结果 基本上我有一个日期列表 我应该为每个组创建缺失的日期 在熊猫中有reindex函数 这是 pyspark 中不可用的 我尝试实现 pandas UDF pandas udf schema
  • Spark Scala 将列从一个数据帧复制到另一个数据帧

    我有一个原始数据框的修改版本 我在其上进行了聚类 现在我想将预测列恢复为原始 DF 索引没问题 因此匹配 我该怎么做 使用这段代码我得到一个错误 println Predicted dfWithOutput show println Ori
  • 对于多列,将当前行和上一行的差异附加到新列

    对于 df 中的每一列 我想从前一行 row n 1 row n 中减去当前行 但我遇到了困难 我的代码如下 usr bin python3 from pandas datareader import data import pandas
  • fetchsize和batchsize对Spark的影响

    我想通过以下方式控制 RDB 的读写速度Spark直接 但标题已经透露的相关参数似乎不起作用 我可以得出这样的结论吗fetchsize and batchsize我的测试方法不起作用 或者它们确实会影响阅读和写作方面 因为测量结果基于规模是
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • pandas 替换多个值

    以下是示例数据框 gt gt gt df pd DataFrame a 1 1 1 2 2 b 11 22 33 44 55 gt gt gt df a b 0 1 11 1 1 22 2 1 33 3 2 44 4 3 55 现在我想根据
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map

随机推荐