2017-10-10 85 views
0

如果我在GCS上存储了一个.txt文件,其中包含将用作beam.Filter一部分的单词列表,可以在我的apache波束管道中动态访问此列表吗?我知道我可以在流水线中将这个列表定义为一个全局变量,但我不确定如何将整个文件读入列表,以及是否有任何光束技巧来完成此操作。有什么建议么?这是我当前实现,其工作不..Google Cloud Dataflow访问云存储中的.txt文件

def boolean_terms(word, term_list): 
    if word in term_list: 
    return (word, 1) 
    else: 
    return (word, 0) 

# side table 
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file) 

words = ... 

filtered_words = words | beam.FlatMap(lambda x: 
    [boolean_terms(word, filter_terms) for word in x]) 

我得到以下错误“类型错误:类型_InvalidUnpickledPCollection'的说法并不迭代”

回答

3

您可以访问单词列表为side input 。我相信beam.Filter变换支持使用来自过滤功能的侧面输入,其方式与该链接示例中的FlatMapParDo完全相同。

喜欢的东西:

words | beam.Filter(lambda x, filter_terms: word in filter_terms, 
        filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path))) 
+0

谢谢!我认为我更接近,但它似乎仍然不适合我。我错过了什么吗? – reese0106

+0

嗯,我想我想通了 - 我需要添加'pvalue.AsList(filter_terms)'让这个工作正常 – reese0106

相关问题