在 DataFlow 管道中,按键分组后的简单计数步骤非常慢

2024-03-15

我有一个 DataFlow 管道尝试构建索引(键值对)并计算一些指标(例如每个键的值数量)。输入数据总计约 60 GB,存储在 GCS 上,管道分配了约 126 个工作线程。根据 Stackdriver,所有工作线程的 CPU 利用率约为 6%。

尽管有 126 个工作人员,但管道似乎没有取得任何进展,并且根据挂起时间,瓶颈似乎是分组之后的一个简单计数步骤。虽然所有其他步骤平均花费的时间不到 1 小时,但计数步骤已经花费了 50 天的时间。日志中的所有警告似乎没有有用的信息。

计数步骤是按照 WordCount 示例中的相应步骤实现的:

def count_keywords_per_product(self, key_and_group):
    key, group = key_and_group
    count = 0
    for e in group:
        count += 1

    self.stats.product_counter.inc()
    self.stats.keywords_per_product_dist.update(count)

    return (key, count)

前面的步骤“对关键字进行分组”是一个简单的 beam.GroupByKey() 转换。

请告知可能是什么原因以及如何优化。

Current resource metrics:
Current vCPUs    126
Total vCPU time      1,753.649 vCPU hr
Current memory   472.5 GB
Total memory time    6,576.186 GB hr
Current PD   3.08 TB
Total PD time    43,841.241 GB hr
Current SSD PD   0 B
Total SSD PD time    0 GB hr
Total Shuffle data processed     1.03 TB
Billable Shuffle data processed      529.1 GB

The pipeline steps including the counting one can be seen below: enter image description here


此处对每个键进行求和的最佳方法是使用组合操作。原因是它可以缓解有热键的问题。

尝试更换你的GroupByKey + ParDo with a beam.combiners.Count.PerKey,或适合您的用例的类似组合变换。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 DataFlow 管道中,按键分组后的简单计数步骤非常慢 的相关文章

随机推荐