如果我缓存 Spark Dataframe,然后覆盖引用,原始数据帧是否仍会被缓存?

2023-12-24

假设我有一个函数来生成 (py)spark 数据帧,并将数据帧作为最后一个操作缓存到内存中。

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
   return df

根据我的理解,Spark的缓存工作原理如下:

  1. When cache/persist加上一个动作(count()) 在数据上调用 帧,它是从它的 DAG 计算出来并缓存到内存中,贴在 指向引用它的对象。
  2. 只要存在对该对象的引用(可能在其他函数/其他范围内),df 将继续被缓存,并且依赖于 df 的所有 DAG 将使用内存中缓存的数据作为起点。
  3. 如果删除了对 df 的所有引用,Spark 会将缓存作为内存进行垃圾收集。它可能不会立即被垃圾收集,从而导致一些短期内存块(特别是,如果您生成缓存数据并太快地丢弃它们,则会导致内存泄漏),但最终它会被清除。

我的问题是,假设我使用gen_func生成数据框,但随后覆盖原始数据框引用(可能使用filter or a withColumn).

df=gen_func(inputs)
df=df.filter("some_col = some_val")

在Spark中,RDD/DF是不可变的,因此过滤器之后重新分配的df和过滤器之前的df引用了两个完全不同的对象。在这种情况下,对原始 df 的引用是cache/counted已被覆盖。这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器df是否会从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

我问这个问题是因为我最近正在修复代码中的一些内存不足问题,在我看来,缓存可能是问题所在。然而,我还不太了解使用缓存的安全方法是什么,以及如何意外地使缓存内存失效的全部细节。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?


我做了几个实验,如下所示。显然,数据帧一旦被缓存,仍然被缓存(如图所示getPersistentRDDs和查询计划 -InMemory等),即使所有 Python 引用都被覆盖或完全删除del,并显式调用垃圾收集。

实验一:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df2.select('*').explain()

del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

Results:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}

>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#172L AS col1#174L]
                  +- *(1) Scan ExistingRDD[_1#172L]

>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}

实验2:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df.select('*').explain()

del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

Results:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}

>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}

>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#218L AS col1#220L]
                  +- *(1) Scan ExistingRDD[_1#218L]

>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}

实验3(对照实验,证明unpersist works)

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
df2.select('*').explain()

df.unpersist()
df2.select('*').explain()

Results:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}

>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#310L AS col1#312L]
                  +- *(1) Scan ExistingRDD[_1#310L]

>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
   +- *(1) Scan ExistingRDD[_1#310L]

回答OP的问题:

这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器 df 将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

实验表明no对彼此而言。数据帧保持缓存状态,不会被垃圾收集,并且根据查询计划,使用缓存的(不可引用的)数据帧计算新的数据帧。

一些与缓存使用相关的有用功能(如果您不想通过 Spark UI 执行此操作)包括:

sc._jsc.getPersistentRDDs(),它显示了缓存的 RDD/数据帧的列表,以及

spark.catalog.clearCache(),这会清除所有缓存的 RDD/数据帧。

我在执行上述操作时是否偏离了最佳实践?

我无权就此评判你,但正如其中一条评论所建议的,避免重新分配给df因为数据帧是不可变的。尝试想象你正在用 scala 编码并且你定义了df as a val. Doing df = df.filter(...)是不可能的。 Python 本身无法强制执行这一点,但我认为最佳实践是避免覆盖任何数据帧变量,以便您始终可以调用df.unpersist()之后,如果您不再需要缓存的结果。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果我缓存 Spark Dataframe,然后覆盖引用,原始数据帧是否仍会被缓存? 的相关文章

  • 在 Celery 任务中调用 Google Cloud API 永远不会返回

    我正在尝试拨打外部电话Google Cloud Natural Language API从一个内Celery任务 使用google cloud python包裹 问题是对 API 的调用永远不会返回 挂起 celery task def g
  • matplotlib 图中点的标签

    所以这是一个关于已发布的解决方案的问题 我试图在我拥有的 matplotlib 散点图中的点上放置一些数据标签 我试图在这里模仿解决方案 是否有与 MATLAB 的 datacursormode 等效的 matplotlib https s
  • pandas DataFrame.join 的运行时间是多少(大“O”顺序)?

    这个问题更具概念性 理论性 与非常大的数据集的运行时间有关 所以我很抱歉没有一个最小的例子来展示 我有一堆来自两个不同传感器的数据帧 我需要最终将它们连接成两个very来自两个不同传感器的大数据帧 df snsr1 and df snsr2
  • VSCode Settings.json 丢失

    我正在遵循教程 并尝试将 vscode 指向我为 Scrapy 设置的虚拟工作区 但是当我在 VSCode 中打开设置时 工作区设置 选项卡不在 用户设置 选项卡旁边 我还尝试通过以下方式手动转到文件 APPDATA Code User s
  • 如何从Python中的函数返回多个值? [复制]

    这个问题在这里已经有答案了 如何从Python中的函数返回多个变量 您可以用逗号分隔要返回的值 def get name you code return first name last name 逗号表示它是一个元组 因此您可以用括号将值括
  • 使用主题交换运行多个 Celery 任务

    我正在用 Celery 替换一些自制代码 但很难复制当前的行为 我期望的行为如下 创建新用户时 应向tasks与交换user created路由键 该消息应该触发两个 Celery 任务 即send user activate email
  • 在 Django Admin 中调整字段大小

    在管理上添加或编辑条目时 Django 倾向于填充水平空间 但在某些情况下 当编辑 8 个字符宽的日期字段或 6 或 8 个字符的 CharField 时 这确实是一种空间浪费 字符宽 然后编辑框最多可容纳 15 或 20 个字符 我如何告
  • 矩形函数的数值傅里叶变换

    本文的目的是通过一个众所周知的分析傅里叶变换示例来正确理解 Python 或 Matlab 上的数值傅里叶变换 为此 我选择矩形函数 这里报告了它的解析表达式及其傅立叶变换https en wikipedia org wiki Rectan
  • 如何将特定范围内的标量添加到 numpy 数组?

    有没有一种更简单 更节省内存的方法可以单独在 numpy 中执行以下操作 import numpy as np ar np array a l r ar c a a 0 l ar tolist a r 它可能看起来很原始 但它涉及获取给定数
  • 导入错误:没有名为flask.ext.login的模块

    我的flask login 模块有问题 我已经成功安装了flask login模块 另外 从命令提示符我可以轻松运行此脚本 不会出现错误 Python 2 7 r27 82525 Jul 4 2010 07 43 08 MSC v 1500
  • 嵌套作用域和 Lambda

    def funct x 4 action lambda n x n return action x funct print x 2 prints 16 我不太明白为什么2会自动分配给n n是返回的匿名函数的参数funct 完全等价的定义fu
  • 将 Matlab 的 datenum 格式转换为 Python

    我刚刚开始从 Matlab 迁移到 Python 2 7 在读取 mat 文件时遇到一些问题 时间信息以 Matlab 的日期数字格式存储 对于那些不熟悉它的人 日期序列号将日历日期表示为自固定基准日期以来已经过去的天数 在 MATLAB
  • 在 Google App Engine 中,如何避免创建具有相同属性的重复实体?

    我正在尝试添加一个事务 以避免创建具有相同属性的两个实体 在我的应用程序中 每次看到新的 Google 用户登录时 我都会创建一个新的播放器 当新的 Google 用户在几毫秒内进行多个 json 调用时 我当前的实现偶尔会创建重复的播放器
  • PySpark groupByKey 返回 pyspark.resultiterable.ResultIterable

    我试图找出为什么我的 groupByKey 返回以下内容 0
  • 为什么 csv.DictReader 给我一个无属性错误?

    我的 CSV 文件是 200 Service 我放入解释器的代码是 snav csv DictReader open screennavigation csv delimiter print snav fieldnames 200 for
  • 等待子进程使用 os.system

    我用了很多os system在 for 循环内调用创建后台进程 如何等待所有后台进程结束 os wait告诉我没有子进程 ps 我使用的是Solaris 这是我的代码 usr bin python import subprocess imp
  • 在virtualenv中下载sqlite3

    我正在尝试使用命令创建应用程序python3 manage py startapp webapp但我收到一条错误消息 django core exceptions ImproperlyConfigured 加载时出错 pysqlite2 或
  • Spark Scala 相当于 SKEW 连接提示

    Spark SQL 有一个可用的倾斜提示 请参阅here https docs databricks com spark latest spark sql skew join html relation columns and skew v
  • 如何将Python3设置为Mac上的默认Python版本?

    有没有办法将 Python 3 8 3 设置为 macOS Catalina 版本 10 15 2 上的默认 Python 版本 我已经完成的步骤 看看它安装在哪里 ls l usr local bin python 我得到的输出是这样的
  • 如何在Python脚本中从youtube-dl中提取文件大小?

    我是 python 编程新手 我想在下载之前提取视频 音频大小 任何 YouTube 视频 gt gt gt from youtube dl import YoutubeDL gt gt gt url https www youtube c

随机推荐