2017-10-06 80 views
0

我正在尝试将我的管道响应写入Google存储,但获取已安装在服务器上的模块导入错误。Python Apache Beam Google存储写入错误

代码:

from __future__ import print_function, absolute_import 
import apache_beam as beam 
from apache_beam.io import ReadFromText 
from apache_beam.io import WriteToText 
from apache_beam.transforms import PTransform, ParDo, DoFn, Create 
from apache_beam.io import iobase, range_trackers 
import logging 
import re 
import argparse 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import SetupOptions 

logger = logging.getLogger(__name__) 
logging.basicConfig(level=logging.DEBUG) 

def mongo_connection_string(url): 
    import logging 
    logger = logging.getLogger(__name__) 
if 'gs://' in url: 
    from google.cloud import storage 
    logging.info('Fetching connection string from Cloud Storage {}'.format(url)) 
    _, path = url.split('gs://') 
    path = path.split('/') 
    bucket = path[0] 
    path = '/'.join(path[1:]) 
    client = storage.Client() 
    blob = client.get_bucket(bucket).get_blob(path).download_as_string() 
    connection_string = blob.splitlines()[0] 
    return connection_string 
logger.info('Using connection string from CLI options') 
return url 

iso_match = re.compile(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}') 

def clean_query(query): 
    new_query = {} 
    for key, val in query.iteritems(): 
     if isinstance(val, basestring): 
      val = str(val) 
    if isinstance(val, basestring) and iso_match.match(val): 
     val = datetime.datetime.strptime(val[0:19], '%Y-%m-%dT%H:%M:%S') 
    elif isinstance(val, dict): 
     val = clean_query(val) 
    new_query[str(key)] = val 
return new_query 

class _MongoSource(iobase.BoundedSource): 
    import pymongo 
    def __init__(self, connection_string, db, collection, query=None, fields=None): 
    import logging 
    logger = logging.getLogger(__name__) 
    self._connection_string = connection_string 
    self._db = db 
    self._collection = collection 
    self._fields = fields 
    self._client = None 

    # Prepare query 
    self._query = query 
    if not self._query: 
     self._query = {} 
    logger.info('Raw query: {}'.format(query)) 
    self._query = clean_query(self._query) 
    logger.info('Cleaned query: {}'.format(self._query)) 

@property 
def client(self): 
    import logging 
    import pymongo 
    logger = logging.getLogger(__name__) 
    if self._client: 
     logger.info('Reusing existing PyMongo client') 
     return self._client 
    logger.info('Preparing new PyMongo client') 
    self._client = pymongo.MongoClient(self._connection_string) 
    return self._client 

def estimate_size(self): 
    return self.client[self._db][self._collection].count(self._query) 

def get_range_tracker(self, start_position, stop_position): 
    from apache_beam.io import iobase, range_trackers 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY 
    range_tracker = range_trackers.OffsetRangeTracker(start_position, stop_position) 
    range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker) 

    return range_tracker 

def read(self, range_tracker): 
    coll = self.client[self._db][self._collection] 
    for doc in coll.find(self._query, projection=self._fields): 
     yield doc 

def split(self, desired_bundle_size, start_position=None, stop_position=None): 
    from apache_beam.io import iobase, range_trackers 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY 
    yield iobase.SourceBundle(
     weight=1, 
     source=self, 
     start_position=start_position, 
     stop_position=stop_position) 


class ReadFromMongo(PTransform): 
    def __init__(self, connection_string, db, collection, query=None, fields=None): 
     super(ReadFromMongo, self).__init__() 
     self._connection_string = connection_string 
     self._db = db 
     self._collection = collection 
     self._query = query 
     self._fields = fields 
     self._source = _MongoSource(
     self._connection_string, 
     self._db, 
     self._collection, 
     query=self._query, 
     fields=self._fields) 

def expand(self, pcoll): 
    import logging 
    logger = logging.getLogger(__name__) 
    logger.info('Starting MongoDB read from {}.{} with query {}' 
       .format(self._db, self._collection, self._query)) 
    return pcoll | iobase.Read(self._source) 

def display_data(self): 
    return {'source_dd': self._source} 


def transform_doc(document): 
    data={str(document['clause type']):int(document['count'])} 
    return data 
def run(): 
    import time 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--output', 
        dest='output', 
        default='<output path>', 
        help='Output file to write results to.') 
    known_args, pipeline_args = parser.parse_known_args() 
    gcs_path = "<gcs URL>" 
    project_name = "<project name>" 
    pipeline_args.extend(['--runner=DataflowRunner', 
    "--project=civic-eye-181513", 
    "--staging_location=<stagging location>", 
    "--temp_location=<temp location>" 
    ]) 
pipeline_options = PipelineOptions(pipeline_args) 
pipeline_options.view_as(SetupOptions).save_main_session = True 
with beam.Pipeline(options=pipeline_options) as pipeline: 
    print ("starting pipleline") 
    connection_string = '<mongo URL>' 
    (pipeline 
    | "Load" >> ReadFromMongo(connection_string, 'new', 'Data', query={}, fields=['clause type','count']) 
    | "transform" >> beam.Map(transform_doc).with_output_types(str) 
    | "Save" >> WriteToText("{0}/output/wordcount{1}".format(gcs_path,int(time.time())))) 
print ("done")  

if __name__ == '__main__': 
    run() 

错误:

Exception in worker loop: Traceback (most recent call last): 
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 738, in run work, 
    execution_context, env=self.environment) 
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workitem.py", line 130, in get_work_items 
    work_item_proto.sourceOperationTask.split) 
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workercustomsources.py", line 142, in __init__ 
    source_spec[names.SERIALIZED_SOURCE_KEY]['value']) 
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads return 
    dill.loads(s) 
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads 
    return load(file) 
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load 
    obj = pik.load() 
File "/usr/lib/python2.7/pickle.py", line 858, in load 
    dispatch[key](self) 
File "/usr/lib/python2.7/pickle.py", line 1133, in load_reduce 
    value = func(*args) 
File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 766, in _import_module 
    return __import__(import_name) 
ImportError: No module named pymongo 

注:Pymongo模块已经与最新版本的安装:

pip show pymongo  
Name: pymongo  
Version: 3.5.1  
Summary: Python driver for MongoDB <http://www.mongodb.org>  
Home-page: http://github.com/mongodb/mongo-python-driver  
Author: Bernie Hackett  
Author-email: [email protected]  
License: Apache License, Version 2.0  
Location: /usr/local/lib/python2.7/dist-packages 

感谢

回答

0

如果您使用的是一些非def ault python库,像你自己的utils库或pypi的一些依赖项,那么你需要提供一个requirementssetup文件。您可以在this link

这样做的原因是看到关于它的细节,当您将作业提交到数据流,你的代码实际上是在不同的计算引擎运行数据流服务,为您旋转起来。您正在使用的所有依赖关系都需要安装在它们上面。这可以通过提供requirementssetup文件来实现。

由于您使用的PyPI的依赖,所有你需要做的是

  1. 通过执行pip freeze>requirements.txt
  2. 创建一个需求文件提供要求的文件,以管道选项

提供需求文件,使用以下代码提供参数

requirements_file = "/path/to/requirements_file" 
pipeline_options.view_as(SetupOptions).requirements_file = requirements_file 

所以你运行函数应该如下所示

def run(): 
    import time 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--output', 
        dest='output', 
        default='<output path>', 
        help='Output file to write results to.') 
    known_args, pipeline_args = parser.parse_known_args() 
    gcs_path = "<gcs URL>" 
    project_name = "<project name>" 
    pipeline_args.extend(['--runner=DataflowRunner', 
    "--project=civic-eye-181513", 
    "--staging_location=<stagging location>", 
    "--temp_location=<temp location>" 
    ]) 
pipeline_options = PipelineOptions(pipeline_args) 
pipeline_options.view_as(SetupOptions).save_main_session = True 
requirements_file = "/path/to/requirements_file" 
pipeline_options.view_as(SetupOptions).requirements_file = requirements_file 
with beam.Pipeline(options=pipeline_options) as pipeline: 
    print ("starting pipleline") 
    connection_string = '<mongo URL>' 
    (pipeline 
    | "Load" >> ReadFromMongo(connection_string, 'new', 'Data', query={}, fields=['clause type','count']) 
    | "transform" >> beam.Map(transform_doc).with_output_types(str) 
    | "Save" >> WriteToText(" {0}/output/wordcount{1}".format(gcs_path,int(time.time())))) 
print ("done") 

如果你使用一些定制编写Python包,你自己喜欢的utils的文件需要,那么所有你需要做的是,创建安装文件使用setuptools并以与requirements文件类似的方式提供它。

你可以在this link

+0

感谢Anuj了解setuptools:让我在这方面的工作,并更新你 –

+0

现在收到此错误:数据流似乎被卡住了。请通过http://stackoverflow.com/questions/tagged/google-cloud-dataflow与Dataflow团队联系。 –

+0

您使用的Apache梁版本是什么? – Anuj