我想不出一种方法可以在不将其变成 RDD 的情况下做到这一点。
# convert df to rdd
rdd = df.rdd
def extract(row, key):
"""Takes dictionary and key, returns tuple of (dict w/o key, dict[key])."""
_dict = row.asDict()
_list = _dict[key]
del _dict[key]
return (_dict, _list)
def add_to_dict(_dict, key, value):
_dict[key] = value
return _dict
# preserve rest of values in key, put list to flatten in value
rdd = rdd.map(lambda x: extract(x, 'sBus'))
# make a row for each item in value
rdd = rdd.flatMapValues(lambda x: x)
# add flattened value back into dictionary
rdd = rdd.map(lambda x: add_to_dict(x[0], 'sBus', x[1]))
# convert back to dataframe
df = sqlContext.createDataFrame(rdd)
df.show()
棘手的部分是将其他列与新展平的值保持在一起。我通过将每一行映射到一个元组来做到这一点(dict of other columns, list to flatten)
然后打电话flatMapValues http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.flatMapValues。这会将值列表的每个元素拆分为单独的行,但保留附加的键,即
(key, ['A', 'B', 'C'])
becomes
(key, 'A')
(key, 'B')
(key, 'C')
然后,我将展平的值移回到其他列的字典中,并将其重新转换回 DataFrame。