2015-04-23 130 views
3

我知道几周前更新了CDF服务(默认工作人员类型&附加的PD已更改),并明确表示它会使批处理作业变慢。然而,我们的工作表现已经下降,超出了他们实际满足我们业务需求的程度。例如,对于我们的其中一个工作,它从BigQuery中的表中读取约270万行,具有6个侧面输入(BQ表),进行一些简单的字符串转换,最后写入多个输出(3)到BigQuery。这个过去需要5-6分钟,现在需要15-20分钟的时间 - 无论我们有多少虚拟机在使用它。数据流性能问题

有什么我们可以做的,以获得速度回到我们以前看到的?

下面是一些统计:

  1. 从BQ表读取与2744897行(294MB)
  2. 6 BQ侧输入
  3. 3多输出到BQ,其中2个是2744897和其他1500行
  4. 在区亚太east1-b运行下面
  5. 时间包括工作池自旋向上和拆除

10个虚拟机(N1-STANDARD-2) 16分钟5秒 2015-04-22_19_42_20-4740106543213058308

10个虚拟机(N1-STANDARD-4) 17分11秒 2015 - 04-22_20_04_58-948224342106865432

10个虚拟机(N1-STANDARD-1) 18分钟44秒 2015-04-22_19_42_20-4740106543213058308

20倍的VM(N1-STANDARD-2) 22分钟53秒 2015-04-22_21_26_53-18171886778433479315

50倍的VM(N1-STANDARD-2) 17分26秒 2015-04 -22_21_51_37-16026777746175810525

100层的虚拟机(N1-STANDARD-2) 19分钟33秒 2015-04-22_22_32_13-9727928405932256127

+0

我查看了其中一个作业的步骤执行日志,看起来大部分时间(大约9分钟)已用于将步骤已写入的数据导入BQ。我们将研究为什么这个导入过程变得如此缓慢。 – jkff

+0

有没有一种解决方法,我可以使用,直到你找出为什么它变得如此缓慢? –

+0

一位队友认为,缓慢可能是由于新SDK对待边界输入的方式发生了变化 - 请问您可以参考http://stackoverflow.com/questions/29718820/why-did-sideinput-method-从上下文转移到processcontext-in-dataflow-beta并检查它是否与您的工作相关? – jkff

回答

2

我们追查到了这个问题。这是当侧面输入从正在传输数据的BigQuery表中读取而不是批量加载时。当我们复制表格并从副本读取时,一切正常。

但是,这只是一种解决方法。数据流应该能够处理BigQuery中的流式表作为侧面输入。

4

证据似乎表明您的管道如何处理侧面输入存在问题。具体而言,主输入的每个元素都可能重复从BigQuery中重新读取副输入。这与Dataflow工作人员使用的虚拟机类型的更改完全正交,如下所述。

这与Dataflow SDK for Java版本0.3.150326中所做的更改密切相关。在该版本中,我们更改了侧面输入API以适用于每个窗口。现在调用sideInput()现在仅在与主输入元素的窗口相对应的特定窗口中返回值,而不是整个侧面输入PCollectionView。因此,sideInput()不能再从startBundlefinishBundle中调用DoFn,因为该窗口尚不知道。

例如,下面的代码片段有一个问题,会导致每个输入元素重新读取侧面输入。

@Override 
public void processElement(ProcessContext c) throws Exception { 
    Iterable<String> uniqueIds = c.sideInput(iterableView); 

    for (String item : uniqueIds) { 
    [...] 
    } 

    c.output([...]); 
} 

此代码可以通过缓存侧输入到在高速缓存List代替侧输入变换(假定它能够装入内存)在第一次调用processElement期间,和使用一个List成员变量来改善随后的调用。

此替代方法应恢复您之前看到的性能,此时可从startBundle调用副输入。长期来看,我们将致力于更好地缓存副作用。 (如果这不利于全面解决该问题,请通过电子邮件与我们联络,分享相关的代码片段。)


另外,有,的确,更新至约4云数据流服务/ 9/15更改了Dataflow工作人员使用的默认虚拟机类型。具体而言,我们减少了每个工作人员的内核缺省数量,因为我们的基准测试显示它对于典型作业而言具有成本效益这是而不是任何类型的数据流服务都会减速 - 默认情况下,它只是在每个工作人员的资源较少的情况下运行。用户仍然可以选择覆盖工作人员数量以及工作人员使用的虚拟机类型。

+0

这就是我们已经做的事情 - 我们在processElement()中读取一次侧面输入,并将结果缓存在内部变量(在我们的例子中是一个HashMap),并将其用于每次后续调用processElement()。在最新发布之前,这一切都正常工作。 –

+1

不,不会为每次调用processElement()时创建一个'ParDo'的新实例。这将打破捆绑的目的。然而,你的经验证据似乎表明了这方面的一些东西。我将单独与您联系,让我们在回到底部时发表一个答案。 –