2016-10-03 63 views
0

我开始学习Dataflow我正在使用此代码示例Autocomplete。我试图从BigQuery的阅读,但我收到此错误:Google DataFlow - 从BigQuery读取自动完成示例

ERROR:root:Error while visiting split 
... 
File "/usr/lib/python2.7/re.py", line 181, in findall 
return _compile(pattern, flags).findall(string) 
TypeError: expected string or buffer [while running 'split'] 

代码:

def run(argv=None): 

parser = argparse.ArgumentParser() 
parser.add_argument('--output', 
        required=True, 
        help='Output file to write results to.') 
known_args, pipeline_args = parser.parse_known_args(argv) 

pipeline_options = PipelineOptions(pipeline_args) 
pipeline_options.view_as(SetupOptions).save_main_session = True 
p = beam.Pipeline(options=pipeline_options) 

(p # pylint: disable=expression-not-assigned 
| 'read' >> beam.io.Read(beam.io.BigQuerySource(input_table)) 
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) 
| 'TopPerPrefix' >> TopPerPrefix(5) 
| 'format' >> beam.Map(
    lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) 
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) 
p.run() 

我明白任何反馈。 Tks Tom

回答

1

BigQuerySource默认返回结构化记录,即从列到输入表中的值的映射。这意味着您不能直接在记录上运行re.findall。

相反,提取您所关心的特定领域,即

| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x['my_string_field']))