2016-10-26 11 views
3

当我远程运行我的数据管道时引发PicklingError:数据管道已经使用Python的Beam SDK编写,我正在Google Cloud Dataflow之上运行它。当我在本地运行时,管道工作正常。如何解决类apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum上的酸洗错误?

下面的代码生成的PicklingError:这应该重现问题

import apache_beam as beam 
from apache_beam.transforms import pvalue 
from apache_beam.io.fileio import _CompressionType 
from apache_beam.utils.options import PipelineOptions 
from apache_beam.utils.options import GoogleCloudOptions 
from apache_beam.utils.options import SetupOptions 
from apache_beam.utils.options import StandardOptions 

if __name__ == "__main__": 
    pipeline_options = PipelineOptions() 
    pipeline_options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner' 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = "project-name" 
    google_cloud_options.job_name = "job-name" 
    google_cloud_options.staging_location = 'gs://path/to/bucket/staging' 
    google_cloud_options.temp_location = 'gs://path/to/bucket/temp' 
    p = beam.Pipeline(options=pipeline_options) 
    p.run() 

下面是从一开始就样品和回溯的末尾:

WARNING: Could not acquire lock C:\Users\ghousains\AppData\Roaming\gcloud\credentials.lock in 0 seconds 
WARNING: The credentials file (C:\Users\ghousains\AppData\Roaming\gcloud\credentials) is not writable. Opening in read-only mode. Any refreshed credentials will only be valid for this run. 
Traceback (most recent call last): 
    File "formatter_debug.py", line 133, in <module> 
    p.run() 
    File "C:\Miniconda3\envs\beam\lib\site-packages\apache_beam\pipeline.py", line 159, in run 
    return self.runner.run(self) 
    .... 
    .... 
    .... 
    File "C:\Miniconda3\envs\beam\lib\sitepackages\apache_beam\runners\dataflow_runner.py", line 172, in run 
    self.dataflow_client.create_job(self.job))  
    StockPickler.save_global(pickler, obj) 
    File "C:\Miniconda3\envs\beam\lib\pickle.py", line 754, in save_global (obj, module, name)) 
    pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum 

回答

1

我通过解决了这个问题在run()方法中封装main的主体并调用run()。

2

我发现你的错误被当包含在被酸洗发送到云上下文中的管道对象提出:

pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum 

当然,你可能会问:

  1. 什么让Pipeline对象在发送到云端时不可取消,因为通常它是可以被pickleable的?
  2. 如果这确实是问题,那么我不会一直得到这个错误 - 不是通常包含在发送到云的上下文中的Pipeline对象?
  3. 如果Pipeline对象通常不包含在发送到云的上下文中,那么为什么Pipeline对象包含在我的案例中?

(1)

当你调用p.run()在管道与cloud=True,那首先发生的事情之一是,p.runner.job=apiclient.Job(pipeline.options)apache_beam.runners.dataflow_runner.DataflowPipelineRunner.run设置。

没有这个属性设置,管道是pickleable。但是一旦设置了这个,Pipeline就不再被pickleable,因为p.runner.job.proto._Message__tags[17]TypeValueValuesEnum,它被定义为apache_beam.internal.clients.dataflow.dataflow_v1b3_messages中的嵌套类。 AFAIK嵌套类不能腌制(即使是莳萝 - 见How can I pickle a nested class in python?)。

(2) - (3)

直觉相反,一个管道对象通常不包括在发送到云中的上下文。当你调用p.run()在管道与cloud=True,只有下列对象腌(请注意,p.runner.job获取设置后的酸洗发生):

  1. 如果save_main_session=True,然后指定__main__模块中的所有全局对象腌制。 (__main__是从命令行运行的脚本)。
  2. 每个变换在管道定义单独腌

在你的情况,你遇到了#1,这就是为什么你的解决方案工作。我实际上遇到了#2,其中我将beam.Map lambda函数定义为复合PTransform的一种方法。(当应用复合变换时,管道被添加为变换的属性...)我的解决方案是在模块中定义这些lambda函数。

较长期的解决方案是我们在Apache Beam项目中解决这个问题。待定!

+1

这应该被固定在谷歌-数据流0.4.4 SDK释放。 https://github.com/apache/incubator-beam/pull/1485 – Sourabh