2016-03-02 70 views
3

我想通过Google Cloud Storage的多个输入来运行Dataflow作业,但我不想仅通过* glob操作符指定要传递给作业的路径。如何指定Dataflow作业的多个输入路径

考虑这些路径:

gs://bucket/some/path/20160208/input1 
gs://bucket/some/path/20160208/input2 
gs://bucket/some/path/20160209/input1 
gs://bucket/some/path/20160209/input2 
gs://bucket/some/path/20160210/input1 
gs://bucket/some/path/20160210/input2 
gs://bucket/some/path/20160211/input1 
gs://bucket/some/path/20160211/input2 
gs://bucket/some/path/20160212/input1 
gs://bucket/some/path/20160212/input2 

我想我的工作就是在201602092016021020160211目录上的文件,但不能在20160208(第一批)和20160212(最后)。事实上,还有很多日期,我希望能够为我的工作指定一个任意范围的日期。

The docs for TextIO.Read说:

标准的Java文件系统glob模式( “*”, “[..]” “?”)的支持。

但我无法得到这个工作。有一个到Java Filesystem glob patterns的链接,链接到getPathMatcher(String),列出所有匹配选项。其中之一是{a,b,c},这看起来完全像我所需要的,但是,如果我通过gs://bucket/some/path/201602{09,10,11}/*TextIO.Read#from我得到“无法展开文件模式”。

也许文档意味着只有*?[…]支持,如果是这样的话,我怎么能构建一个水珠是数据流会接受,并且可以匹配任意日期范围像我描述以上?

更新:我已经想通了,我可以写一段代码来,这样我可以在路径前缀作为一个逗号分隔的列表传递,从各创建一个输入并使用 Flatten变换,但这似乎是一种非常低效的方式。它看起来像第一步读取所有输入文件,并立即再次将它们写出到GCS上的临时位置。只有当所有的输入被读取和写入时,实际的处理才开始。这一步在我写作的工作中完全没有必要。我希望作业能够读取第一个文件,开始处理它并阅读下一个文件,依此类推。 这只是造成了很多其他问题,我会努力使其发挥作用,但由于最初的重写,感觉像死胡同。

回答

2

该文档确实意味着仅支持*,?[...]。这意味着以字母或数字顺序的任意子集或范围不能表示为单个glob。

这里有一些可能会为你工作的方法:

  1. 如果文件路径表示的日期也存在于这些文件中的记录,那么最简单的办法是看他们所有和使用Filter转换为选择您感兴趣的日期范围。
  2. 您尝试使用多次读取的方法分隔TextIO.Read变换并展开它们即可适用于小型文件集;我们的tf-idf example这样做。您可以表达任意数值范围与少量水珠的所以这不一定是每个文件一个读(“通过67 23”,例如这两个字符范围是2[3-][3-5][0-9]6[0-7]
  3. 如果文件的子集是更加随意的,那么globs /文件名的数量可能会超过最大图形大小,最后的建议是将文件列表放入PCollection并使用ParDo转换来读取每个文件并发出其内容。

我希望这有助于!

+1

对于选项3 ...你能帮我一个关于如何把文件列表放入PCollection然后ParDo的例子吗? – CCC