2016-12-15 54 views
3

我想读取一个csv文件并使用apache beam数据流将其写入BigQuery。为了做到这一点,我需要以字典的形式将数据呈现给BigQuery。我如何使用apache beam来转换数据以执行此操作?如何将csv转换为apache beam数据流中的字典

我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表。我知道如何在BigQuery中创建数据,这是直接的,我不知道如何将csv转换为字典。下面的代码是不正确的,但应该告诉我想要做什么。

# Standard imports 
import apache_beam as beam 
# Create a pipeline executing on a direct runner (local, non-cloud). 
p = beam.Pipeline('DirectPipelineRunner') 
# Create a PCollection with names and write it to a file. 
(p 
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv')) 
# How do you do this?? 
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v}) 
| 'save' >> beam.Write(
    beam.io.BigQuerySink(
    output_table, 
    schema='month:INTEGER, tornado_count:INTEGER', 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) 
p.run() 

回答

8

这个想法是有一个返回解析CSV行的源。您可以通过继承FileBasedSource类来包含CSV解析来实现此目的。特别是,read_records功能会是这个样子:

class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource): 
    def read_records(self, file_name, range_tracker): 
    self._file = self.open_file(file_name) 

    reader = csv.reader(self._file) 

    for rec in reader: 
     yield rec 

我最近写了一CsvFileSource为Apache梁。你可以看看the Github repository。您可以使用pip install beam_utilsfrom beam_utils.sources import CsvFileSource来使用它。 CsvFileSource还包括设置自定义分隔符,跳过文件头和/或输出字典而不是列表的选项。

+1

非常感谢巴勃罗,这个作品非常好!这里是一个代码片段,以防人们在寻找完整性(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write(beam .io.TextFileSink('./ greetings_solar'))) – user1753640

+1

我正在尝试将结果写入BigQuery,但没有运气,表格被创建但没有数据。你能告诉发生了什么事吗?这里是一个片段(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write( beam.io.BigQuerySink( output_table , 模式= 'lumosity:INTEGER,时间:INTEGER', create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE))) – user1753640

+0

@ user1753640:我有同样的问题,不得不在将数据存储到GBQ之前,使用匹配模式的字典。 – vdolez

0

作为Pablo职位的补充,我想分享一下我对自己的样本所做的一些改变。 (+1你!)

更改: reader = csv.reader(self._file)reader = csv.DictReader(self._file)

csv.DictReader使用CSV文件作为字典键的第一行。其他行用于使用它的值填充每行的字典。它会根据列顺序自动将正确的值放到正确的键上。

一个小细节是Dict中的每个值都以字符串形式存储。如果您使用eg,这可能会与您的BigQuery架构发生冲突。 INTEGER对于某些字段。所以你之后需要注意正确的铸造。