2017-02-16 74 views
2

我正在使用Dataflow 0.5.5 Python。在非常简单的代码中出现以下错误:数据流0.5.5 - '_UnwindowedValues'类型的对象有没有len()是什么意思?

print(len(row_list)) 

row_list是一个列表。完全相同的代码,相同的数据和相同的管道在DirectRunner上运行得非常好,但在DataflowRunner上引发以下异常。这是什么意思,我该如何解决它?

job name: `beamapp-root-0216042234-124125` 

    (f14756f20f567f62): Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work 
    work_executor.execute() 
    File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547) 
    with op.scoped_metrics_container: 
    File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495) 
    op.start() 
    File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149) 
    def start(self): 
    File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053) 
    with self.scoped_start_state: 
    File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968) 
    with self.shuffle_source.reader() as reader: 
    File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912) 
    self.output(windowed_value) 
    File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317) 
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
    File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021) 
    cython.cast(Operation, consumer).process(windowed_value) 
    File "dataflow_worker/executor.py", line 766, in dataflow_worker.executor.BatchGroupAlsoByWindowsOperation.process (dataflow_worker/executor.c:25558) 
    self.output(wvalue.with_value((k, wvalue.value))) 
    File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317) 
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
    File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021) 
    cython.cast(Operation, consumer).process(windowed_value) 
    File "dataflow_worker/executor.py", line 545, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18474) 
    with self.scoped_process_state: 
    File "dataflow_worker/executor.py", line 546, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18428) 
    self.dofn_receiver.receive(o) 
    File "apache_beam/runners/common.py", line 195, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:5137) 
    self.process(windowed_value) 
    File "apache_beam/runners/common.py", line 262, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7078) 
    self.reraise_augmented(exn) 
    File "apache_beam/runners/common.py", line 274, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:7467) 
    raise type(exn), args, sys.exc_info()[2] 
    File "apache_beam/runners/common.py", line 258, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:6967) 
    self._dofn_simple_invoker(element) 
    File "apache_beam/runners/common.py", line 198, in apache_beam.runners.common.DoFnRunner._dofn_simple_invoker (apache_beam/runners/common.c:5283) 
    self._process_outputs(element, self.dofn_process(element.value)) 
    File "apache_beam/runners/common.py", line 286, in apache_beam.runners.common.DoFnRunner._process_outputs (apache_beam/runners/common.c:7678) 
    for result in results: 
    File "trip_augmentation_test.py", line 120, in get_osm_way 
TypeError: object of type '_UnwindowedValues' has no len() [while running 'Pull way info from mapserver'] 

代码在这里:trip_augmentation_test.py

#!/usr/bin/env python 
# coding: utf-8 

from __future__ import absolute_import 

import argparse 
import logging 
import json 

import apache_beam as beam 
from apache_beam.utils.options import PipelineOptions 
from apache_beam.utils.options import SetupOptions 


def get_osm_way(pairs_same_group): 

    import requests 
    from requests.adapters import HTTPAdapter 
    from requests.packages.urllib3.exceptions import InsecureRequestWarning 
    from multiprocessing.pool import ThreadPool 
    import time 
    #disable InsecureRequestWarning for a cleaner output 
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 

    print('processing hardwareid={} trips'.format(pairs_same_group[0])) 

    row_list = pairs_same_group[1] 
    print(row_list) 
    http_request_num = len(row_list) ######### this line ran into the above error########## 
    with requests.Session() as s: 
     s.mount('https://ip address',HTTPAdapter(pool_maxsize=http_request_num)) ##### a host name is needed for this http persistent connection 
     pool = ThreadPool(processes=1) 

     for row in row_list: 
      hardwareid=row['HardwareId'] 
      tripid=row['TripId'] 
      latlonArr = row['LatLonStrArr'].split(','); 
      print('gps points num: {}'.format(len(latlonArr))) 
      cor_array = [] 
      for latlon in latlonArr: 
       lat = latlon.split(';')[0] 
       lon = latlon.split(';')[1] 
       cor_array.append('{{"x":"{}","y":"{}"}}'.format(lon, lat)) 
      url = 'https://<ip address>/functionname?coordinates=[{}]'.format(','.join(cor_array)) 
      print(url) 
      print("Requesting") 
      r = pool.apply_async(thread_get, (s, url)).get() 
      print ("Got response") 
      print(r) 
      if r.status_code==200: 
       yield (hardwareid,tripid,r.text) 
      else: 
       yield (hardwareid,tripid,None) 


def run(argv=None): 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         help=('Input BigQuery table to process specified as: ' 
          'PROJECT:DATASET.TABLE or DATASET.TABLE.')) 
    parser.add_argument(
     '--output', 
     required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 

    known_args, pipeline_args = parser.parse_known_args(argv) 
    pipeline_options = PipelineOptions(argv) 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    p = beam.Pipeline(options=pipeline_options) 

    (p 
    | 'Read trip from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=known_args.input)) 
    | 'Convert' >> beam.Map(lambda row: (row['HardwareId'],row)) 
    | 'Group devices' >> beam.GroupByKey() 
    | 'Pull way info from mapserver' >> beam.FlatMap(get_osm_way) 
    | 'Map way info to dictionary' >> beam.FlatMap(convert_to_dict) 
    | 'Save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
      known_args.output,   schema='HardwareId:INTEGER,TripId:INTEGER,OrderBy:INTEGER,IndexRatio:FLOAT,IsEstimate:BOOLEAN,IsOverRide:BOOLEAN,MaxSpeed:FLOAT,Provider:STRING,RoadName:STRING,WayId:STRING,LastEdited:TIMESTAMP,WayLatLons:STRING,BigDataComment:STRING', 
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 
) 
    # Run the pipeline (all operations are deferred until run() is called). 
    p.run() 


if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

管道调用这里(我使用谷歌云Datalab)

!python trip_augmentation_test.py \ 
--output 'my-project:my-dataset.mytable' \ 
--input 'SELECT HardwareId,TripId, LatLonStrArr FROM [my-project:my-dataset.mytable] ' \ 
--project 'my-project' \ 
--runner 'DataflowRunner' \ ### if just change this to DirectRunner, everything's fine 
--temp_location 'gs://mybucket/tripway_temp' \ 
--staging_location 'gs://mybucket/tripway_staging' \ 
--worker_machine_type 'n1-standard-2' \ 
--profile_cpu True \ 
--profile_memory True 

跟进

我在DataflowRunner中记录了row_list的类型,它是<class 'apache_beam.transforms.trigger._UnwindowedValues'>,而在DirectRunner中,它是list。这是预期的不一致吗?

+0

你能发布更多上下文的完整代码吗? –

+0

添加代码,消除无关的函数定义并镶嵌一些敏感细节 – foxwendy

+0

作为评论,您不需要运行所有分析 - 它可能会减慢您的工作速度。 – Pablo

回答

5

注意:此问题将在梁2.0.1


不幸的是,这是数据流道的特定亚军怪癖。在Dataflow中,GroupByKey的结果不是以列表的形式出现,并且不支持len - 但它的可迭代。

总之,做http_request_num = len(row_list)之前,你可以强制其支持LEN一个类型,e.g:

row_list = list(pairs_same_group[1]) 
http_request_num = len(row_list) 

作为一个说明,这是一个known issue,也希望能够尽快修复。

+0

谢谢@Pablo,我也是自己想出来的,有点像你的想法......这是否告诉我们,不同的选手不同地解释代码?如果是这样的话,最好有一个很好的文件解释不同的跑步者规格。这是一个令人沮丧的经验,发挥出来 – foxwendy

+0

是的。对于那个很抱歉。我们很快会修复或记录它,希望。 – Pablo

相关问题