2017-09-13 46 views
0

问题:和批量处理存储元素

我有我的数据存储项目的2M +用户数据列表。我想向所有用户发送每周通讯。邮件API接受每个API调用最多50个电子邮件地址。

先前的解决方案:

二手应用程序引擎的后端和简单的数据存储区查询一气呵成处理的所有记录。但是会发生什么呢,有时候我会得到内存溢出严重错误日志,并且这个过程又重新开始。由于这一些用户,不止一次收到相同的电子邮件。所以我转向了数据流。

目前的解决方案:

我使用FlatMap功能,每封电子邮件ID发送给一个函数,然后逐一发送电子邮件给每个用户。

def process_datastore(project, pipeline_options): 
    p = beam.Pipeline(options=pipeline_options) 
    query = make_query() 
    entities = (p | 'read from datastore' >> ReadFromDatastore(project, query)) 
    entities | beam.FlatMap(lambda entity: sendMail([entity.properties.get('emailID', "")])) 
    return p.run() 

通过云数据流,我确保每个用户只收到一次邮件,而且没有人遗漏。没有内存错误。

但是这个当前过程需要7个小时才能完成运行。我试图用ParDo替换FlatMap,并假定ParDo将并行化该过程。但即便如此,也需要同一时间。

问:

  1. 如何在一堆50组的电子邮件ID,从而有效地使用邮件API调用?

  2. 如何并行化过程,使所需时间少于一个小时?

+0

您的管道可能遭受此处所述的其中一种情况:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py#L58 。在这种情况下,你需要通过分解融合https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion – jkff

+0

来引入更多的并行性。另外:FlatMap和ParDo是等价的 - 它们都是并行的,但都受到融合(见上文)。一般来说,当调试作业的性能时,请包含一个oncall工程师可以查看的数据流作业ID。 – jkff

回答

0

你可以使用查询光标在50批次的用户分开,做俯卧撑队列或deferred任务内的实际批量处理(电子邮件发送)。这将是一个仅GAE的解决方案,没有云数据流,恕我直言很简单。

您可以在Google appengine: Task queue performance中找到此类处理的示例(同时考虑到该答案)。该解决方案使用deferred库,但使用推送队列任务几乎是微不足道的。

答案涉及并行性方面,您可能想限制它以降低成本。

您也可以将批处理本身分配到任务中,以获得无限可扩展的解决方案(任意数量的接收者,不会超过内存或超时失败的时间),任务重新排队以继续其离开的地方关闭。