查找 Spark DataFrame 中每组的最大行数

2024-01-14

我尝试使用 Spark 数据帧而不是 RDD,因为它们似乎比 RDD 更高级,并且往往会生成更可读的代码。

在 14 个节点的 Google Dataproc 集群中,我有大约 600 万个名称,这些名称由两个不同的系统转换为 id:sa and sb. Each Row包含name, id_sa and id_sb。我的目标是生成一个映射id_sa to id_sb这样对于每个id_sa, 相应的id_sb是所有附加名称中最常见的 IDid_sa.

让我们尝试用一个例子来阐明。如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

我的目标是生成一个映射a1 to b2。事实上,与以下内容相关的名称a1 are n1, n2 and n3,分别映射到b1, b2 and b2, so b2是关联名称中最常见的映射a1。同样地,a2将被映射到b2。可以假设总会有赢家:无需打破平局。

我希望我能使用groupBy(df.id_sa)在我的数据框上,但我不知道下一步该做什么。我希望有一个聚合最终可以生成以下行:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

但也许我尝试使用错误的工具,我应该重新使用 RDD。


Using join(如果出现平局,将导致组中出现多行):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

使用窗口函数(将放弃联系):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

Using struct订购:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

也可以看看如何选择每组的第一行? https://stackoverflow.com/q/33878370/1560062

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

查找 Spark DataFrame 中每组的最大行数 的相关文章

随机推荐