我有一个 DataFlow 管道尝试构建索引(键值对)并计算一些指标(例如每个键的值数量)。输入数据总计约 60 GB,存储在 GCS 上,管道分配了约 126 个工作线程。根据 Stackdriver,所有工作线程的 CPU 利用率约为 6%。
尽管有 126 个工作人员,但管道似乎没有取得任何进展,并且根据挂起时间,瓶颈似乎是分组之后的一个简单计数步骤。虽然所有其他步骤平均花费的时间不到 1 小时,但计数步骤已经花费了 50 天的时间。日志中的所有警告似乎没有有用的信息。
计数步骤是按照 WordCount 示例中的相应步骤实现的:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
前面的步骤“对关键字进行分组”是一个简单的 beam.GroupByKey() 转换。
请告知可能是什么原因以及如何优化。
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
The pipeline steps including the counting one can be seen below: