Python - 使用 pandas 多重处理多个大尺寸文件

2023-12-07

我有一个y.csv文件。文件大小为 10 MB,包含来自Jan 2020 to May 2020.

我每个月还有一个单独的文件。例如data-2020-01.csv。它包含详细的数据。每个月文件的文件大小约为1 GB.

我正在分割y.csv按月份,然后通过加载相关月份文件来处理数据。当我去很多个月时,这个过程花费的时间太长。例如24个月。

我想更快地处理数据。我可以访问 AWSm6i.8xlarge实例有32 vCPU and 128 GB memory.

我是多处理新手。那么有人可以在这里指导我吗?

这是我当前的代码。

import pandas as pd

periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB


def process(_month_df, _index):
    idx = _month_df.index[_month_df.index.get_loc(_index, method='nearest')]
    for _, value in _month_df.loc[idx:].itertuples():

        up_delta = 200
        down_delta = 200

        up_value = value + up_delta
        down_value = value - down_delta

        if value > up_value:
            y.loc[_index, "result"] = 1
            return

        if value < down_value:
            y.loc[_index, "result"] = 0
            return


for x in periods:
    filename = "data-" + str(x[0]) + "-" + str(x[1]).zfill(2)  # data-2020-01
    filtered_y = y[(y.index.month == x[1]) & (y.index.year == x[0])]  # Only get the current month records
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)

    for index, row in filtered_y.iterrows():
        process(month_df, index)

多线程池非常适合共享y线程之间的数据帧(消除了使用共享内存的需要),但不太擅长并行运行 CPU 密集型处理。多处理池非常适合执行 CPU 密集型处理,但在跨进程共享数据而不提供内存碎片表示时效果不佳。y数据框。

在这里,我重新排列了您的代码,以便使用多线程池来创建filtered_y对于每个时期(其中is这是一个 CPU 密集型操作,但 pandas 确实为某些操作释放了全局解释器锁——希望是这个)。然后我们只将一个月的数据传递到多处理池,而不是整个数据ydataframe,使用工作函数处理该月process_month。但由于每个池进程都无权访问ydataframe,它只返回需要用要替换的值更新的索引。

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count

def process_month(period, filtered_y):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def process(period):
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    for index, value in multiprocessing_pool.apply(process_month, (period, filtered_y)):
        y.loc[index, "result"] = value

def main():
    global y, multiprocessing_pool

    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB

    MAX_THREAD_POOL_SIZE = 100
    thread_pool_size = min(MAX_THREAD_POOL_SIZE, len(periods))
    multiprocessing_pool_size = min(thread_pool_size, cpu_count())
    with Pool(multiprocessing_pool_size) as multiprocessing_pool, \
    ThreadPool(thread_pool_size) as thread_pool:
        thread_pool.map(process, periods)
        
    # Presumably y gets written out again as a CSV file here?

# Required for Windows:
if __name__ == '__main__':
    main()

仅使用单个多处理池的版本

import pandas as pd
from multiprocessing.pool import Pool, ThreadPool, cpu_count

def process_month(period):
    """
    returns a list of tuples consisting of (index, value) pairs
    """
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    filtered_y = y[(y.index.month == period[1]) & (y.index.year == period[0])]  # Only get the current month records
    filename = "data-" + str(period[0]) + "-" + str(period[1]).zfill(2)  # data-2020-01
    month_df = pd.read_csv(f'{filename}.csv', index_col=0, parse_dates=True)  # Filesize: ~1 GB (data-2020-01.csv)
    results = []
    for index, row in filtered_y.iterrows():   
        idx = month_df.index[month_df.index.get_loc(index, method='nearest')]
        for _, value in month_df.loc[idx:].itertuples():
    
            up_delta = 200
            down_delta = 200
    
            up_value = value + up_delta
            down_value = value - down_delta
    
            if value > up_value:
                results.append((index, 1))
                break
    
            if value < down_value:
                results.append((index, 0))
                break
    return results

def main():
    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

    multiprocessing_pool_size = min(len(periods), cpu_count())
    with Pool(multiprocessing_pool_size) as multiprocessing_pool:
        results_list = multiprocessing_pool.map(process_month, periods)
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    for results in results_list:
        for index, value in results:
            y.loc[index, "result"] = value
    # Write out new csv file:
    ...

# Required for Windows:
if __name__ == '__main__':
    main()

现在,它的一个变体使用更多的内存,但允许主进程将其处理与多处理池重叠。如果需要更新的索引数量非常大,这可能会很有用:

...
def main():
    periods = [(2020, 1), (2020, 2), (2020, 3), (2020, 4), (2020, 5)]

    multiprocessing_pool_size = min(len(periods), cpu_count() - 1) # save a core for the main process
    y = pd.read_csv("y.csv", index_col=0, parse_dates=True).fillna(0)  # Filesize: ~10 MB
    with Pool(multiprocessing_pool_size) as multiprocessing_pool:
        # Process values as soon as they are returned:
        for results in multiprocessing_pool.imap_unordered(process_month, periods):
            for index, value in results:
                y.loc[index, "result"] = value
    # Write out new csv file:
    ...

最后一个版本可能会更优秀,因为它在将任务提交到池之前首先读取 csv 文件,并且根据平台及其缓存 I/O 操作的方式,可能会导致工作函数不必执行任何物理 I/O 来读取在其文件副本中。但那又是一个10M的文件被读入内存了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python - 使用 pandas 多重处理多个大尺寸文件 的相关文章

随机推荐