如何在 Spark 中向分解结构添加列?

2023-12-09

假设我有以下数据:

{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]}

我想分解有效负载并向其中添加一列,如下所示:

df = df.select('id', F.explode('payload').alias('data'))
df = df.withColumn('data.bar', F.col('data.foo') * 2)

然而,这会产生一个包含三列的数据框:

  • id
  • data
  • data.bar

我预计data.bar成为其中的一部分data结构...

如何向分解结构添加一列,而不是添加顶级列?


df = df.withColumn('data', f.struct(
    df['data']['foo'].alias('foo'),
   (df['data']['foo'] * 2).alias('bar')
))

这将导致:

root
 |-- id: long (nullable = true)
 |-- data: struct (nullable = false)
 |    |-- col1: long (nullable = true)
 |    |-- bar: long (nullable = true)

UPDATE:

def func(x):
    tmp = x.asDict()
    tmp['foo'] = tmp.get('foo', 0) * 100
    res = zip(*tmp.items())
    return Row(*res[0])(*res[1])

df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
    [StructField('foo', StringType()), StructField('lol', StringType())]))(df['data']))

P.S.

Spark几乎不支持inplace手术。

所以每次你想做的事inplace,你需要做replace实际上。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Spark 中向分解结构添加列? 的相关文章

  • Spark-获取RDD中的文件名

    我正在尝试处理每天都在增长的 4 个文本文件目录 我需要做的是 如果有人试图搜索发票号码 我应该给他们包含该发票号码的文件列表 我能够通过将文本文件加载为 RDD 来映射和减少文本文件中的值 但是如何获取文件名和其他文件属性呢 从 Spar
  • AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

    这个命令运行良好有什么原因吗 sql SELECT FROM Azure Reservations WHERE timestamp gt 2021 04 02 返回 2 行 如下 sql DELETE FROM Azure Reservat
  • pandas 数据框列表的列表列表

    我有一个列表的列表 最外层列表的长度为 20 单独的类别 中间列表的长度可变 时间戳列表 内部列表的长度为 5 分割每个时间戳 例如 sTimestamps 0 5 Tue Feb 7 10 06 30 2017 Tue Feb 7 10
  • scala/spark 代码不允许在 hive 中添加列

    如果源数据有新列 我尝试在 Hive 表中添加一列 所有新列的检测都运行良好 但是 当我尝试将列添加到目标表时 我收到此错误 for f lt df schema fields if f name chk spark sqlContext
  • 使用 pandas 删除停用词

    我想从数据框的列中删除停用词 列内有需要拆分的文本 例如我的数据框如下所示 ID Text 1 eat launch with me 2 go outside have fun 我想应用停用词text column所以应该分开 我试过这个
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • 如何使用groupby将多个函数应用于Pandas中的多个列?

    我有一个正常的df A pd DataFrame 1 5 2 2 4 4 3 3 1 4 2 2 5 1 4 columns A B C index 1 2 3 4 5 下列的这个食谱 https stackoverflow com que
  • 如何抑制spark输出控制台中的“Stage 2===>”?

    我有数据帧并试图获取不同的计数并且能够成功获取不同的计数 但是每当 scala 程序执行时我都会收到此消息 Stage 2 gt 1 1 2 我如何在控制台中抑制特定的此消息 val countID dataDF select substr
  • 使用名义变量删除 r 中的异常值

    比如说 我有三列 x lt c 10 1 6 50 x1 lt c 20 1 6 60 z lt c 1 2 3 4 5 6 7 8 检查 x 的异常值 bx lt boxplot x bx out 检查 x1 的异常值 bx1 lt bo
  • 闪亮错误:参数暗示行数不同

    我正在尝试开发一个简单的应用程序 从 Kijiji 网站获取本地分类广告 我用几乎相同的脚本制作了一个类似的应用程序 但我没有收到下面描述的错误 所以我不知道这个脚本出了什么问题 我尝试了我能想到的一切 但无法让它发挥作用 的结构df数据框
  • Pandas DataFrame:如果列为空,则复制列的内容

    我有以下带有命名列和索引的 DataFrame a a b b 1 5 NaN 9 NaN 2 NaN 3 3 NaN 3 4 NaN 1 NaN 4 NaN 9 NaN 7 数据源导致某些列标题的复制方式略有不同 例如 如上所述 某些列标
  • 如何向 pandas 数据框中的新列添加值?

    我想在 Pandas 数据框中创建一个新的命名列 将第一个值插入其中 然后将另一个值添加到同一列 就像是 import pandas df pandas DataFrame df New column append a df New col
  • 如果满足 NaN 阈值,Python 将删除 DF 中的所有特征实例

    Using df dropna thresh x inplace True 我可以成功删除至少缺少的行x非纳米值 但因为我的 df 看起来像 2001 2002 2003 2004 bob A 123 31 4 12 bob B 41 1
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • Pandas:向量化局部范围操作([i:i+2] 行的最大值和总和)

    我希望在数据帧中的每一行的局部范围内进行计算 同时避免速度缓慢for环形 例如 对于下面数据中的每一行 我想找到未来 3 天内 包括当天 的最高气温以及未来 3 天内的总降雨量 Day Temperature Rain 0 30 4 1 3
  • 使用 Spark DataFrame 获取组后所有组的 TopN

    我有一个 Spark SQL DataFrame user1 item1 rating1 user1 item2 rating2 user1 item3 rating3 user2 item1 rating4 如何按用户分组然后返回TopN
  • 为什么 Spark 比 Hadoop MapReduce 更快

    有人可以使用字数统计示例解释一下为什么 Spark 比 MapReduce 更快吗 bafna的答案提供了故事的记忆方面 但我想补充另外两个重要事实 DAG和生态系统 Spark 使用 惰性求值 来形成连续计算阶段的有向无环图 DAG 通过
  • 在 Spark 2.1.0 中启用 _metadata 文件

    Spark 2 1 0 中保存空 Parquet 文件似乎已损坏 因为无法再次读入它们 由于模式推断错误 我发现从 Spark 2 0 开始 写入 parquet 文件时默认禁用写入 metadata 文件 但我找不到重新启用此功能的配置设
  • 使用基于正则表达式的部分匹配来选择 Pandas 数据帧的子数据帧

    我有一个 Pandas 数据框 它有两列 一列 进程参数 列 包含字符串 另一列 值 列 包含相应的浮点值 我需要过滤出部分匹配列 过程参数 中的一组键的子数据帧 并提取与这些键匹配的数据帧的两列 df pd DataFrame Proce
  • Pandas 与 Numpy 数据帧

    看这几行代码 df2 df copy df2 1 df 1 df 1 values 1 df2 ix 0 0 我们的教练说我们需要使用 values属性来访问底层的 numpy 数组 否则我们的代码将无法工作 我知道 pandas Data

随机推荐