我有一个大型(约 1.6 亿行)数据框,我已将其存储到磁盘中,如下所示:
def fillStore(store, tablename):
files = glob.glob('201312*.csv')
names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
for f in files:
df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
store.append(tablename, df, format='table', data_columns=['c_id','f_id'])
该表有一个时间索引,我将使用c_id
and f_id
除了时间(通过索引)。
我有另一个包含约 18000 个“事件”的数据框。每个事件都包含一些(少则数百,多则数十万)单独记录。我需要为每个事件收集一些简单的统计数据并存储它们,以便收集一些汇总统计数据。目前我这样做:
def makeQueryString(c, f, start, stop):
return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))
def getIncidents(inc_times, store, tablename):
incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
for ind, row in inc_times.iterrows():
incidents = incidents.append(store.select(tablename,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop))).fillna(ind)
return incidents
这一切都工作正常,除了每个store.select()
语句大约需要 5 秒,这意味着处理整个月的数据需要 24-30 小时的处理时间。同时,我实际需要的统计数据也比较简单:
def getIncidentStats(df):
incLen = (df.index[-1]-df.index[0]).total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = rqsts/incLen
return pd.Series({'c_id':df.c_id[0],
'f_id':df.fqdn_id[0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std()})
incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)
我的问题是:如何提高此工作流程任何部分的性能或效率?(请注意,实际上我对大部分作业进行批处理以一次获取和存储事件,只是因为我想限制在崩溃时丢失已处理数据的风险。为了简单起见,我将这段代码留在这里因为我实际上需要处理整个月的数据。)
有没有办法在我从商店收到数据时处理数据,这有什么好处吗?
使用 store.select_as_index 我会受益吗?如果我收到索引,我仍然需要访问数据才能获得正确的统计信息吗?
其他注释/问题:我比较了在 SSD 和普通硬盘上存储 HDFStore 的性能,没有发现 SSD 有任何改进。这是预期的吗?
我还考虑过创建一个大的查询字符串连接并同时请求它们。当总查询字符串太大(~5-10 个查询)时,这会导致内存错误。
Edit 1如果重要的话,我使用的是表版本 3.1.0 和 pandas 版本 0.13.1
Edit 2以下是更多信息:
ptdump -av store.h5
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.0',
TITLE := '',
VERSION := '1.0']
/all_recs (Group) ''
/all_recs._v_attrs (AttributeSet), 14 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := ['c_id', 'f_id'],
encoding := None,
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
levels := 1,
nan_rep := 'nan',
non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
pandas_type := 'frame_table',
pandas_version := '0.10.1',
table_type := 'appendable_frame',
values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
"c_id": Int64Col(shape=(), dflt=0, pos=2),
"f_id": Int64Col(shape=(), dflt=0, pos=3)}
byteorder := 'little'
chunkshape := (5461,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/all_recs/table._v_attrs (AttributeSet), 19 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0,
FIELD_1_NAME := 'values_block_0',
FIELD_2_FILL := 0,
FIELD_2_NAME := 'c_id',
FIELD_3_FILL := 0,
FIELD_3_NAME := 'f_id',
NROWS := 161738653,
TITLE := '',
VERSION := '2.6',
client_id_dtype := 'int64',
client_id_kind := ['c_id'],
fqdn_id_dtype := 'int64',
fqdn_id_kind := ['f_id'],
index_kind := 'datetime64',
values_block_0_dtype := 'int64',
values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]
以下是主表和 inc_times 的示例:
In [12]: df.head()
Out[12]:
c_id f_id resp_id resp_len \
ts
2013-12-04 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449
s_id
ts
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727
[5 rows x 5 columns]
In [13]: inc_times.head()
Out[13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000
[5 rows x 4 columns]
关于c_id和f_id,与商店中的ID总数相比,我要从全商店中选择的ID集合相对较少。也就是说,inc_times中有一些流行的ID我会重复查询,而完全忽略全表中存在的一些ID。我估计我关心的 ID 大约占 ID 总数的 10%,但这些是最受欢迎的 ID,因此它们的记录在整个集合中占主导地位。
我有 16GB 内存。完整存储为 7.4G,完整数据集(作为 csv 文件)仅为 8.7 GB。最初,我相信我能够将整个内容加载到内存中,并至少对其进行一些有限的操作,但是在加载整个内容时出现内存错误。因此,将其批处理为每日文件(完整文件包含一个月的数据)。