2017-05-30 80 views
1

我目前正试图运行一个数据流(Apache的光束的Python SDK)任务可以导入> 100GB分享Tweet档案汇入BigQuery,但运行到Error: Message: Too many sources provided: 15285. Limit is 10000.错误:消息:太多的来源提供:15285.限制是10000

任务需要推文(JSON),提取5个相关字段,用一些变换对它们进行变换/清理,然后将这些值写入BigQuery中,这些值将用于进一步处理。

Cloud Dataflow to BigQuery - too many sources但它似乎是由于有很多不同的输入文件,而我有一个单一的输入文件,所以它似乎并不相关。此外,这里提到的解决方案相当神秘,我不确定是否/如何将它们应用于我的问题。

我的猜测是BigQuery在为每行或其他东西写入临时文件之前坚持它们,这就是“太多来源”的含义?

我该如何解决这个问题?

[编辑]

代码:

import argparse 
import json 
import logging 

import apache_beam as beam 

class JsonCoder(object): 
    """A JSON coder interpreting each line as a JSON string.""" 

    def encode(self, x): 
     return json.dumps(x) 

    def decode(self, x): 
     return json.loads(x) 

def filter_by_nonempty_county(record): 
    if 'county_fips' in record and record['county_fips'] is not None: 
     yield record 

def run(argv=None): 

    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         default='...', 
         help=('Input twitter json file specified as: ' 
          'gs://path/to/tweets.json')) 
    parser.add_argument(
     '--output', 
     required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 

    known_args, pipeline_args = parser.parse_known_args(argv) 



    p = beam.Pipeline(argv=pipeline_args) 

    # read text file 

    #Read all tweets from given source file 
    read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder()) 

    #Extract the relevant fields of the source file 
    extract_fields = "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'], 
                    'user_id': row['user']['id'], 
                    'location': row['user']['location'] if 'location' in row['user'] else None, 
                    'geo':row['geo'] if 'geo' in row else None, 
                    'tweet_id': row['id'], 
                    'time': row['created_at']}) 


    #check what type of geo-location the user has 
    has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2) 


    check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0 

    #tweet has coordinates partition or not 
    coordinate_partition = (p 
      | read_tweets 
      | extract_fields 
      | beam.ParDo(TimeConversion()) 
      | has_geo_location_or_not) 


    #lookup by coordinates 
    geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator()) 
          | "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2)) 

    #lookup by profile 
    profile_lookup = ((coordinate_partition[0], geo_lookup[0]) 
         | "join streams" >> beam.Flatten() 
         | "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile()) 
        ) 


    bigquery_output = "write output to BigQuery" >> beam.io.Write(
     beam.io.BigQuerySink(known_args.output, 
        schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING', 
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
       write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 

    #file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder()) 


    output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten() 
       | "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county) 
       | "project relevant fields" >> beam.Map(lambda row: {'text': row['text'], 
                    'user_id': row['user_id'], 
                    'county_fips': row['county_fips'], 
                    'tweet_id': row['tweet_id'], 
                    'time': row['time'], 
                    'county_source': row['county_source']}) 
       | bigquery_output) 

    result = p.run() 
    result.wait_until_finish() 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.DEBUG) 
    run() 

这是一个有点复杂,所以它可能会花费太多时间,直接做在BigQuery中。该代码读取推文json,通过是否进行地理标记来拆分PCollection,如果不是,则会尝试通过配置文件位置查找它,将地图映射到与我们的GIS分析相关的位置,然后将其写入BigQuery。

+0

你能分享你的代码吗?另外,您是否必须使用Beam进行转换,即您可以在BigQuery中执行该操作,即将文件加载到GCS中,将BigQuery指向它,然后转换(如果您更喜欢后)。 –

+0

我在编辑中添加了代码 – Zenon

+0

您使用的是哪个版本的SDK? –

回答

1

文件的数目对应于所述元素进行处理的碎片的数量。

一招至降低这是产生一些随机的键和成组写出来之前基于所述元素。

例如,你可以使用下面的在您的管道DoFnPTransform

class _RoundRobinKeyFn(beam.DoFn): 
    def __init__(self, count): 
    self.count = count 

    def start_bundle(self): 
    self.counter = random.randint(0, self.count - 1) 

    def process(self, element): 
    self.counter += 1 
    if self.counter >= self.count: 
     self.counter -= self.count 
    yield self.counter, element 

class LimitBundles(beam.PTransform): 
    def __init__(self, count): 
    self.count = count 

    def expand(self, input): 
    return input 
     | beam.ParDo(_RoundRobinKeyFn(this.count)) 
     | beam.GroupByKey() 
     | beam.FlatMap(lambda kv: kv[1]) 

你只想用这个bigquery_output前:

output = (# ... 
     | LimitBundles(10000) 
     | bigquery_output) 

(请注意,我只是在输入这个没有测试它,所以可能会出现一些Python错别字。)

相关问题