TLDR:我从 dask bag 创建了一个 dask 数据框。 dask 数据框将每个观察(事件)视为一列。因此,我没有为每个事件提供几行数据,而是为每个事件提供一列。目标是将列转置为行,就像 pandas 使用 df.T 转置数据帧一样。
Details:
我有从我的时间线中获取样本 Twitter 数据。为了达到我的起点,这里是将 json 从磁盘读取到dask.bag
然后将其转换为dask.dataframe
import dask.bag as db
import dask.dataframe as dd
import json
b = db.read_text('./sampleTwitter.json').map(json.loads)
df = b.to_dataframe()
df.head()
问题我所有的个人事件(即推文)都记录为列副行。符合tidy
原则上,我希望每个事件都有行。pandas有数据帧的转置方法dask.array 有一个数组的转置方法。我的目标是在 dask 数据帧上执行相同的转置操作。我该怎么做呢?
- 将行转换为列
编辑解决方案
此代码解决了原始转置问题,通过定义要保留的列并删除其余列来清理 Twitter json 文件,并通过将函数应用于 Series 来创建新列。然后,我们将一个更小的、干净的文件写入磁盘。
import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
from dask.diagnostics import ProgressBar,Profiler, ResourceProfiler, CacheProfiler
import pandas as pd
import json
import glob
# pull in all files..
filenames = glob.glob('~/sampleTwitter*.json')
# df = ... # do work with dask.dataframe
dfs = [delayed(pd.read_json)(fn, 'records') for fn in filenames]
df = dd.from_delayed(dfs)
# see all the fields of the dataframe
fields = list(df.columns)
# identify the fields we want to keep
keepers = ['coordinates','id','user','created_at','lang']
# remove the fields i don't want from column list
for f in keepers:
if f in fields:
fields.remove(f)
# drop the fields i don't want and only keep whats necessary
df = df.drop(fields,axis=1)
clean = df.coordinates.apply(lambda x: (x['coordinates'][0],x['coordinates'][1]), meta= ('coords',tuple))
df['coords'] = clean
# making new filenames from old filenames to save cleaned files
import re
newfilenames = []
for l in filenames:
newfilenames.append(re.search('(?<=\/).+?(?=\.)',l).group()+'cleaned.json')
#newfilenames
# custom saver function for dataframes using newfilenames
def saver(frame,filename):
return frame.to_json('./'+filename)
# converting back to a delayed object
dfs = df.to_delayed()
writes = [(delayed((saver)(df, fn))) for df, fn in zip(dfs, newfilenames)]
# writing the cleaned, MUCH smaller objects back to disk
dd.compute(*writes)