在pyspark中不使用pivot进行分组的有效方法

2023-12-03

我有一个查询,需要使用 pyspark 计算内存利用率。我已经使用pivot 使用 python pandas 实现了这一点,但现在我需要在 pyspark 中执行此操作,而旋转将是一个昂贵的功能,所以我想知道 pyspark 中是否有任何替代方案来解决此解决方案

time_stamp          Hostname    kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1     memory  Total       100
2019/08/17 10:01:06 Server1     memory  used        35
2019/08/17 10:01:09 Server1     memory  buffer      8
2019/08/17 10:02:04 Server1     memory  cached      10
2019/08/17 10:01:05 Server2     memory  Total       100
2019/08/17 10:01:06 Server2     memory  used        42
2019/08/17 10:01:09 Server2     memory  buffer      7
2019/08/17 10:02:04 Server2     memory  cached      9
2019/08/17 10:07:05 Server1     memory  Total       100
2019/08/17 10:07:06 Server1     memory  used        35
2019/08/17 10:07:09 Server1     memory  buffer      8
2019/08/17 10:07:04 Server1     memory  cached      10
2019/08/17 10:08:05 Server2     memory  Total       100
2019/08/17 10:08:06 Server2     memory  used        35
2019/08/17 10:08:09 Server2     memory  buffer      8
2019/08/17 10:08:04 Server2     memory  cached      10

需要将其转化为

time_stamp      Hostname    kpi Percentage
2019-08-17 10:05:00 Server1     memory  17
2019-08-17 10:05:00 Server2     memory  26
2019-08-17 10:10:00 Server1     memory  17
2019-08-17 10:10:00 Server2     memory  17

我使用的Python代码

df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000 
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100

寻找在 pyspark 中复制它并在 python 中寻找更有效的方法,因为数据透视是一项昂贵的操作,我需要在一个非常大的数据集上每 5 分钟执行一次


当转换为列的值列表未知时,旋转的成本很高。 Spark 已超载pivot将它们作为参数的方法。

def pivot(pivotColumn: String, values: Seq[Any])

如果它们未知,Spark 必须对数据集中的不同值进行排序和收集。否则,逻辑是非常简单和描述的here.

该实现添加了一个新的逻辑运算符 (o.a.s.sql.catalyst.plans.logic.Pivot)。该逻辑运算符由新的分析器规则 (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) 转换,该规则当前将其转换为包含大量 if 语句的聚合,每个主值一个表达式。

例如, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") 将被转换为 df.groupBy 的等效项("A", "B").agg(expr(“sum(if(C = '小', D, null))”), expr(“sum(if(C = '大', D, null)) )”))。您可以自己完成此操作,但它会变得很长并且可能很快就会出错。

如果没有旋转,我会做类似的事情:

val in = spark.read.csv("input.csv")
      //cast to the unix timestamp
      .withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
      .drop($"time_stamp")

现在,我们可以按时间窗口和主机名对数据集进行分组,并将 KPI 指标收集到地图中。
有一个优秀的answer正是描述了这一点。

val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }

val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
  .agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))

Output

+------------------------------------------+--------+-------------------------------------------------------------+
|window                                    |Hostname|metrics                                                      |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+

现在我们定义一些别名和一个简单的 select 语句:

val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")

val result = grouped.select($"window", $"Hostname",
          (total - ((total - used + buffer + cached) / total) * 100).as("percentage"))

现在我们开始:

+------------------------------------------+--------+----------+
|window                                    |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0      |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0      |
+------------------------------------------+--------+----------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在pyspark中不使用pivot进行分组的有效方法 的相关文章

  • DataFrame 在函数内部修改

    我面临一个我以前从未观察到的函数内数据帧修改的问题 有没有一种方法可以处理这个问题 以便初始数据帧不被修改 def test df df tt np nan return df dff pd DataFrame data 现在 当我打印时d
  • Python 在 chroot 中运行时出现错误

    我尝试在 chroot 中运行一些 Python 程序 但出现以下错误 Could not find platform independent libraries
  • 为什么 tkinter / window.update 在我的程序中随着时间的推移变得更慢?

    我发现当我调用 window update 时 当向窗口写入的内容较少时 它的运行速度会更快 但后来 当我向窗口写入更多元素时 window update 需要更长的时间 请参阅下面的我的代码 您可以看到它在更新窗口之前一次向屏幕 100
  • Python,将迭代函数变成递归函数

    我创建了一个输出 4 3 2 1 0 1 2 3 4 的迭代函数 def bounce2 n s n for i in range n print n n n 1 if n lt 0 for i in range s 1 print n n
  • 将 API 数据存储到 DataFrame 中

    我正在运行 Python 脚本来从 Interactive Brokers API 收集金融市场数据 连接到API后 终端打印出请求的历史数据 如何将数据保存到数据帧中而不是在终端中流式传输 from ibapi wrapper impor
  • 为什么导入 pdb 时出现此错误? “模块”对象没有属性“ascii_letters”

    尝试调试我的代码 我正在导入库pdb import sys from subprocess import check call import pdb functions if name main Code 我收到此错误 File reg p
  • 如何在 ReportLab 段落中插入回车符?

    有没有办法在 ReportLab 的段落中插入回车符 我试图将 n 连接到我的段落字符串 但这不起作用 Title Paragraph Title n Page myStyle 我想要这样做 因为我将名称放入单元格中 并且想要控制单元格中的
  • 字典中的列表,Python 中的循环

    我有以下代码 TYPES hotmail type hotmail lookup mixed dkim no signatures S Return Path email protected cdn cgi l email protecti
  • 使用 Scipy imsave 将 Numpy 数组保存到图像时保留未更改的数据

    使用 Scipy 保存二维 Numpy 数组 单个值 时toimage or imsave像素值与 Numpy 数组中的像素值不完全匹配 相反 在某些区域 主要是边缘 图像算法似乎使用某种插值 是否有一个选项可以停止插值并保留准确的数据 例
  • Python 正则表达式部分匹配或“hitEnd”

    我正在编写一个扫描器 因此我将任意字符串与正则表达式规则列表进行匹配 如果我可以模拟 Java hitEnd 功能 不仅知道正则表达式何时不匹配 还知道何时匹配 这将非常有用 can t匹配 当正则表达式匹配器在决定拒绝输入之前到达输入末尾
  • 如何使用 Pandas 将巨大的 CSV 转换为 SQLite?

    我有一个巨大的表 大约 60 GB 采用存档的 CSV 文件形式 我想将其转换为 SQLite 文件 我现在所做的事情如下 import pandas import sqlite3 cnx sqlite3 connect db sqlite
  • 如何修复错误“AttributeError:‘模块’对象在 python3 中没有属性‘客户端’?

    以下是我的代码 import http h1 http client HTTPConnection www bing com 我认为没问题 但是 python 给了我以下错误 AttributeError 模块 对象没有属性 客户端 我想知
  • pip 安装软件包两次

    不幸的是我无法重现它 但我们已经见过几次了 pip 将一个软件包安装两次 如果卸载第一个 第二个就会可见并且也可以被卸载 我的问题 如果一个包安装了两次 如何用 python 检查 背景 我想编写一个测试来检查这一点 devOp Updat
  • 参数验证,Python 中的最佳实践[关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 让我们举一个 API 的例子 def get abs directory self path if os path isdir path ret
  • 从 IMDbPy 结果中的片目中获取电影 ID

    我正在尝试创建一个数据集 允许我根据 Python IMDb API 中的演员 ID 和电影 ID 加入演员和电影 现在 我正在尝试从演员的电影作品中提取电影 ID 列表 但无法做到 例如 我知道 Rodney Dangerfield 在
  • 为什么 bot.get_channel() 会产生 NoneType?

    我正在制作一个 Discord 机器人来处理公告命令 当使用该命令时 我希望机器人在特定通道中发送一条消息 并向用户发送一条消息以表明该命令已发送 但是 我无法将消息发送到频道 我尝试了这段代码 import discord import
  • 最小硬币找零问题——回溯

    我正在尝试用最少数量的硬币解决硬币找零问题 采用回溯法 我实际上已经完成了它 但我想添加一些选项 按其单位打印硬币数量 而不仅仅是总数 这是我下面的Python代码 def minimum coins coin list change mi
  • 获取调用者文件的绝对路径

    假设我在不同的目录中有两个文件 1 py 比如说 在C FIRST FOLDER 1 py and 2 py 比如说 在C SECOND FOLDER 2 py 文件1 py进口2 py using sys path insert 0 pa
  • issubclass() 对从不同路径导入的同一类返回 False

    目的是实现某种插件框架 其中插件是同一基类 即 A 的子类 即 B 基类使用标准导入加载 而子类使用 imp load module 从众所周知的包 即 pkg 的路径加载 pkg init py mod1 py class A mod2
  • 从 pandas 数据框中绘制堆积条形图

    我有数据框 payout df head 10 复制以下 Excel 绘图的最简单 最智能和最快的方法是什么 我尝试过不同的方法 但无法让一切都到位 Thanks 如果您只想要一个堆积条形图 那么一种方法是使用循环来绘制数据框中的每一列 并

随机推荐