2017-09-13 39 views

回答

2

准备自定义数据流的工人,你应该提供setup.py文件,安装所需的软件包命令。 首先,创建setup.py文件类似下面的(这是一个普通的setup.py文件)。 您应该列出REQUIRED_PACKAGES可变你的包,或只是把pip install matplotlib==2.0.2CUSTOM_COMMANDS像我一样。

请注意,matplotlib需要一些额外的软件包/库被安装在系统中,所以你需要安装它们也通过指定他们安装的命令。此外,如果你想使数据流里面工作地块,则需要配置matplotlib后端一个,这是能够写入文件输出(见How can I set the 'backend' in matplotlib in Python?)。

然后,创建setup.py文件后,只需指定阿帕奇梁管道参数:

import apache_beam as beam 
p = beam.Pipeline("DataFlowRunner", argv=[ 
'--setup_file', './setup.py', 
# put other parameters here 
]) 

通用setup.py文件:

import sys 
import os 
import logging 
import subprocess 
import pickle 

import setuptools 
import distutils 

from setuptools.command.install import install as _install 



class install(_install): # pylint: disable=invalid-name 
    def run(self): 
     self.run_command('CustomCommands') 
     _install.run(self) 

CUSTOM_COMMANDS = [ 
    ['pip', 'install', 'matplotlib==2.0.2'], 
] 


class CustomCommands(setuptools.Command): 
    """A setuptools Command class able to run arbitrary commands.""" 

    def initialize_options(self): 
     pass 

    def finalize_options(self): 
     pass 

    def RunCustomCommand(self, command_list): 
     logging.info('Running command: %s' % command_list) 
     p = subprocess.Popen(
      command_list, 
      stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 
     # Can use communicate(input='y\n'.encode()) if the command run requires 
     # some confirmation. 
     stdout_data, _ = p.communicate() 
     logging.info('Command output: %s' % stdout_data) 
     if p.returncode != 0: 
      raise RuntimeError(
       'Command %s failed: exit code: %s' % (command_list, p.returncode)) 

    def run(self): 
     for command in CUSTOM_COMMANDS: 
      self.RunCustomCommand(command) 


REQUIRED_PACKAGES = [ 

] 


setuptools.setup(
    name='name', 
    version='1.0.0', 
    description='DataFlow worker', 
    install_requires=REQUIRED_PACKAGES, 
    packages=setuptools.find_packages(), 
    cmdclass={ 
     'install': install, 
     'CustomCommands': CustomCommands, 
     } 
    )