假设我有以下数据:
{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]}
我想分解有效负载并向其中添加一列,如下所示:
df = df.select('id', F.explode('payload').alias('data'))
df = df.withColumn('data.bar', F.col('data.foo') * 2)
然而,这会产生一个包含三列的数据框:
我预计data.bar
成为其中的一部分data
结构...
如何向分解结构添加一列,而不是添加顶级列?
df = df.withColumn('data', f.struct(
df['data']['foo'].alias('foo'),
(df['data']['foo'] * 2).alias('bar')
))
这将导致:
root
|-- id: long (nullable = true)
|-- data: struct (nullable = false)
| |-- col1: long (nullable = true)
| |-- bar: long (nullable = true)
UPDATE:
def func(x):
tmp = x.asDict()
tmp['foo'] = tmp.get('foo', 0) * 100
res = zip(*tmp.items())
return Row(*res[0])(*res[1])
df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
[StructField('foo', StringType()), StructField('lol', StringType())]))(df['data']))
P.S.
Spark几乎不支持inplace手术。
所以每次你想做的事inplace,你需要做replace实际上。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)