问题:和批量处理存储元素
我有我的数据存储项目的2M +用户数据列表。我想向所有用户发送每周通讯。邮件API接受每个API调用最多50个电子邮件地址。
先前的解决方案:
二手应用程序引擎的后端和简单的数据存储区查询一气呵成处理的所有记录。但是会发生什么呢,有时候我会得到内存溢出严重错误日志,并且这个过程又重新开始。由于这一些用户,不止一次收到相同的电子邮件。所以我转向了数据流。
目前的解决方案:
我使用FlatMap功能,每封电子邮件ID发送给一个函数,然后逐一发送电子邮件给每个用户。
def process_datastore(project, pipeline_options):
p = beam.Pipeline(options=pipeline_options)
query = make_query()
entities = (p | 'read from datastore' >> ReadFromDatastore(project, query))
entities | beam.FlatMap(lambda entity: sendMail([entity.properties.get('emailID', "")]))
return p.run()
通过云数据流,我确保每个用户只收到一次邮件,而且没有人遗漏。没有内存错误。
但是这个当前过程需要7个小时才能完成运行。我试图用ParDo替换FlatMap,并假定ParDo将并行化该过程。但即便如此,也需要同一时间。
问:
如何在一堆50组的电子邮件ID,从而有效地使用邮件API调用?
如何并行化过程,使所需时间少于一个小时?
您的管道可能遭受此处所述的其中一种情况:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py#L58 。在这种情况下,你需要通过分解融合https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion – jkff
来引入更多的并行性。另外:FlatMap和ParDo是等价的 - 它们都是并行的,但都受到融合(见上文)。一般来说,当调试作业的性能时,请包含一个oncall工程师可以查看的数据流作业ID。 – jkff