2017-08-25 89 views
2

给定一个相对较小的数据源(3,000-10,000)的键/值对,我试图只处理符合组阈值(50-100)的记录。所以最简单的方法是将它们按键,过滤和展开进行分组 - 无论是使用FlatMap还是ParDo。迄今为止,最大的团体只有1,500条记录。但这似乎是Google Cloud Dataflow生产中的一个严重瓶颈。为什么Apache Beam中的GroupByKey之后的FlatMap如此之慢?

随着给定的列表

(1,1) (1,2) (1,3) ... (2,1) (2,2) (2,3) ...

通过键变换集过滤和组的运行:

p | 'Group' >> beam.GroupByKey() 
    | 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50) 
    | 'Unwind' >> beam.FlatMap(lambda (key, values): values) 

任何想法如何使这更好的性能?谢谢你的帮助!

+0

请与调查结果报告!如果答案有用,请选择它。 – Pablo

回答

2

这是一个有趣的管道案例。我相信你的问题在于你读取来自GroupByKey的数据。让我简单介绍一下GBK的工作原理。

什么GroupByKey,以及如何大数据系统实现它

所有的大数据系统实现方式实现在同一个密钥的多个元素的操作。这在MapReduce中被称为减少,在其他大数据系统中被称为Group By Key或Combine。

当您执行GroupByKey转换时,Dataflow需要将单个密钥的所有元素收集到同一台机器中。由于同一个密钥的不同元素可能在不同的机器上处理,所以数据需要以某种方式序列化。

这意味着当您读取来自GroupByKey的数据时,您正在访问工作人员的IO(即不是来自内存),所以您确实要避免多次读取shuffle数据。

这如何转化为您的管道

我相信,在这里你的问题是,FilterUnwind都将分别读取洗牌的数据(所以你会读出每个列表数据两次)。你想要做的只是读你的洗牌数据一次。您可以使用单一的FlatMap来完成此操作,该功能既可以在不洗牌的情况下进行双重读取,也可以对数据进行过滤和展开。就像这样:

def unwind_and_filter((key, values)): 
    # This consumes all the data from shuffle 
    value_list = list(values) 
    if len(value_list) > 50: 
    yield value_list 

p | 'Group' >> beam.GroupByKey() 
    | 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter) 

让我知道这是否有帮助。

+0

感谢您的回应!尽管尝试了不同的变化,但它没有效果。问题不在于它分组的速度有多快,而是在FlatMap阶段中从组发出的展开元素的速度有多快 - 每秒钟两到三次,而所有其他转换几乎立即进行。而且我知道它不可并行,因为它必须位于同一台机器上。 –

+0

平面图之后你有什么变革?你有工作ID吗? – Pablo

+0

确定它是一个为每个验证点添加随机记录的ParDo,然后是为远程查询批量随机记录的另一个记录。作业ID是:2017-08-26_06_10_38-4021415491644394676。 –

相关问题