Pyspark 合并数据帧行,一个数组包含在另一个数组中

2023-12-30

我什至不知道表达这些问题的最佳标题是什么。

我有以下数据集

df = spark.createDataFrame([\
            (["1", "2","3","4"], ),\
            (["1","2","3"], ),\
            (["2","1","3"], ),\
            (["2","3","4","1"], ),\
            (["6","7"], )\
], ['cycle', ])
df.show()

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|   [1, 2, 3]|
|   [2, 1, 3]|
|[2, 3, 4, 1]|
|      [6, 7]|
+------------+

我最后想要的是:

  1. 删除排列
  2. 仅保留包含所有其他集合的最大行的行

我可以用sort_array() and distinct()摆脱排列

df.select(f.sort_array("cycle").alias("cycle")).distinct().show() 
+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
|   [1, 2, 3]|
+------------+

我想用 Pyspark 减少数据集是:

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
+------------+

所以以某种方式检查一下[1, 2, 3]是其一部分[1, 2, 3, 4]并且只保留 所以Python子集命令A.issubset(B)应用在Pyspark、Spark方式上一列

我目前能想到的唯一方法是对每一行进行可怕的迭代循环,这将杀死所有性能


您可以尝试的一种方法是首先找到所有cycles 至少有一个superset(排除自我)通过使用自加入找到d2.cycle满足以下条件:

  • size(数组除外 http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.functions.array_except(d2.cycle, d1.cycle))==0: 中没有项目d2.cycle被排除在d1.cycle(空数组将满足)
  • size(d2.cycle) < size(d1.cycle): the size of d2.cycle小于size of d1.cycle:

然后采用 left_anti 连接从原始数据帧中排除上述列表,最后运行 sort_array 和 drop_duplicates(或distinct):

from pyspark.sql.functions import expr

df_sub = df.alias('d1').join(
      df.alias('d2')
    , expr('size(array_except(d2.cycle, d1.cycle))==0 AND size(d2.cycle) < size(d1.cycle)')
).select('d2.cycle').distinct()

df_sub.show()
#+---------+
#|    cycle|
#+---------+
#|[1, 2, 3]|
#|[2, 1, 3]|
#+---------+

df.join(df_sub , on=['cycle'], how='left_anti') \
  .withColumn('cycle', expr('sort_array(cycle)')) \
  .distinct() \
  .show()
#+------------+                                                                  
#|       cycle|
#+------------+
#|[1, 2, 3, 4]|
#|      [6, 7]|
#+------------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark 合并数据帧行,一个数组包含在另一个数组中 的相关文章

  • Python Nose 导入错误

    我似乎无法理解鼻子测试框架 https nose readthedocs org en latest 识别文件结构中测试脚本下方的模块 我已经设置了演示该问题的最简单的示例 下面我会解释一下 这是包文件结构 init py foo py t
  • DataFrame 在函数内部修改

    我面临一个我以前从未观察到的函数内数据帧修改的问题 有没有一种方法可以处理这个问题 以便初始数据帧不被修改 def test df df tt np nan return df dff pd DataFrame data 现在 当我打印时d
  • Python + PostgreSQL + 奇怪的ascii = UTF8编码错误

    我有包含字符的 ascii 字符串 x80 代表欧元符号 gt gt gt print x80 当将包含该字符的字符串数据插入数据库时 我得到 psycopg2 DataError invalid byte sequence for enc
  • 为什么导入 pdb 时出现此错误? “模块”对象没有属性“ascii_letters”

    尝试调试我的代码 我正在导入库pdb import sys from subprocess import check call import pdb functions if name main Code 我收到此错误 File reg p
  • conda 无法从 yml 创建环境

    我尝试运行下面的代码来从 YAML 文件创建虚拟 Python 环境 我在 Ubuntu 服务器上的命令行中运行代码 虚拟环境名为 py36 当我运行下面的代码时 我收到下面的消息 环境也没有被创建 这个问题是因为我有几个必须使用 pip
  • 使用 Scipy imsave 将 Numpy 数组保存到图像时保留未更改的数据

    使用 Scipy 保存二维 Numpy 数组 单个值 时toimage or imsave像素值与 Numpy 数组中的像素值不完全匹配 相反 在某些区域 主要是边缘 图像算法似乎使用某种插值 是否有一个选项可以停止插值并保留准确的数据 例
  • 在Python中创建一个新表

    我正在尝试从数控机床中提取数据 事件每毫秒发生一次 我需要过滤掉一些用管道 分隔的变量分隔符 PuTTy exe 程序生成的日志文件 我尝试阅读熊猫 但列不在同一位置 df pd read table data log sep 日志文件的一
  • 如何使用循环将十进制转换为二进制?

    我想编写一个程序 将十进制数 0 到 9 转换为二进制数 我可以编写如何使用重复除法将十进制数转换为二进制数的代码 但是 我在创建一个以二进制格式打印十进制数字 0 到 9 的循环时遇到了麻烦 这是我的代码 number 0 remaind
  • 网页抓取 - 前往第 2 页

    如何访问数据集的第二页 无论我做什么 它都只返回第 1 页 import bs4 from urllib request import urlopen as uReq from bs4 import BeautifulSoup as sou
  • 如何使用 Python 多处理避免在分叉进程中加载​​父模块

    当您创建一个Pool使用Python的进程multiprocessing 这些进程将分叉 父进程中的全局变量将显示在子进程中 如下面的问题所述 如何限制多处理进程的范围 https stackoverflow com questions 2
  • 一行Python和SQLite代码,为什么需要加“,”? [复制]

    这个问题在这里已经有答案了 c execute INSERT INTO numbers VALUES random randint 0 100 如果我将上面的代码更改为 c execute INSERT INTO numbers VALUE
  • 杂乱的扭曲连接在不干净的时尚中消失了。没有代理。已经尝试过标题

    我正在尝试抓取这个网站 https www5 apply2jobs com jupitermed ProfExt index cfm fuseaction mExternal searchJobs https www5 apply2jobs
  • 在Python中删除带有重音符号的字符串中的所有非字母字符

    我正在尝试使用 Python 3 7 从包含重音符号的字符串中删除所有非字母字符 空格除外 我尝试了以下方法 import re text 29 1981 4 2008 clean text re sub W d text print cl
  • Python 视频框架

    我正在寻找一个 Python 框架 它将使我能够播放视频并在该视频上绘图 用于标记目的 我尝试过 Pyglet 但这似乎效果不是特别好 在现有视频上绘图时 会出现闪烁 即使使用双缓冲和所有这些好东西 而且似乎没有办法在每帧回调期间获取视频中
  • 如何在 Python 中从 HTML 页面中提取 URL [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我必须用Python 编写一个网络爬
  • Django 接受 AM/PM 作为表单输入

    我试图弄清楚如何使用 DateTime 字段在 Django 中接受 am pm 作为时间格式 但我遇到了一些麻烦 我尝试在 forms py 文件中这样设置 pickup date time from DateTimeField inpu
  • 如何正确消除字典中的元素直到只剩下一个字符串

    我真的需要这方面的帮助 def get winner dict winner new dict for winner in dict winner first letter winner 0 value dict winner winner
  • 在游戏中实现功能

    我在完成这部分作业时遇到了麻烦 我必须宣布游戏的获胜者 然后输入到函数中 输入所有 if 语句后 我必须创建一个函数def playGame 这必须包括 showRules user getUserChoice computer getCo
  • Scala:如何获取数据框中的行范围

    我有一个DataFrame通过运行创建sqlContext readParquet 文件的一个 The DataFrame由 300 M 行组成 我需要使用这些行作为另一个函数的输入 但我想以较小的批次进行操作 以防止 OOM 错误 目前
  • 从数据集的给定日期范围中提取属于一天的数据

    我有一个数据集 日期范围为 2018 年 1 月 12 日到 8 月 3 日 其中包含一些值 维数为my df数据框是 my df shape 9752 2 每行包含半小时频率 第一行开始于2018 01 12 my df iloc 0 D

随机推荐