2012-03-09 72 views
0

在我的应用程序中,我实现了由5个不同“处理单元”组成的非常粗糙的工作流程。目前代码的结构如下:Python:分支工作流程/管道的现有解决方案?

def run(self, result_first_step=None, result_second_step=None): 

    config = read_workflow_config("config.ini") 

    if config.first_step: 
     result_first_step = run_process_1() 

    if config.second_step and result_first_step is not None: 
     result_second_step = run_process_2(result_first_step) 
    else: 
     raise Exception("Missing required data") 

    if config.third_step: 
     result_third_step = run_process_3(result_first_step, result_second_step) 
    else: 
     result_third_step = None 

    collect_results(result_first_step, result_second_step, result_third_step) 

等等。代码的工作原理很难维护,而且非常脆弱(处理比这个简单的例子复杂得多)。因此,我一直在考虑采用另一种策略,即制定适当的工作流程:

  • 短路:我可以不给任何数据给启动过程或两种不同类型的数据。在后一种情况下,工作流的短路,并跳过一些处理
  • 普通的对象:东西一样配置提供给所有单元
  • 条件:取决于配置,一些位可以被跳过

是否有一个Python库可用于执行这些类型的工作流程,还是我应该推出自己的?我一直在尝试pyutilib.workflow,但它不能正确地支持一个通用配置对象,而不能将它传递给所有工作者(乏味)。

注意:这是针对库/命令行应用程序的,因此基于Web的工作流程解决方案并不合适。

+0

你有没有试着用搜索引擎这个问题?你发现了什么问题? – Marcin 2012-03-09 17:13:05

+0

你写的方式看起来像'run_process_2',除非你已经有'run_process_1'。真的吗? – katrielalex 2012-03-09 17:13:17

+0

的确,我会调整它以更好地展示我的想法。编辑:更改示例如何可以短路。 – Einar 2012-03-09 17:16:49

回答

0

您可以将运行方法变成一个生成器;

def run(self) 
    result_first_step = run_process_1() 
    yield result_first_step 
    result_second_step = run_process_2(result_first_step) 
    yield result_second_step 
    result_third_step = run_process_3(result_first_step, result_second_step) 
    collect_results(result_first_step, result_second_step, result_third_step) 
0

有相当的范围内半个页面的方法在Python, 管道向...
这里的主要思想: 顶部,放置在一个字典中所有的步骤定义;
然后管道(例如, “C A T”)做的步骤C,A,T。

class Pipelinesimple: 
    """p = Pipelinesimple(funcdict); p.run("C A T") = C(X) | A | T 

    funcdict = dict(A = Afunc, B = Bfunc ... Z = Zfunc) 
    pipeline = Pipelinesimple(funcdict) 
    cat = pipeline.run("C A T", X) # C(X) | A | T, i.e. T(A(C(X))) 
    dog = pipeline.run("D O G", X, **kw) # D(X, **kw) | O(**kw) | G(**kw) 
    """ 

def __init__(self, funcdict): 
    self.funcdict = funcdict # funcs or functors of X 

def run(self, steps, X, **commonargs): 
    """ steps "C A T" or ["C", "A", "T"] 
     all funcs(X, **commonargs) 
    """ 

    if isinstance(steps, basestring): 
     steps = steps.split() # "C A T" -> ["C", "A", "T"] 
    for step in steps: 
     func = self.funcdict(step) 
     # if X is None: ... 
     X = func(X, **commonargs) 
    return X 

接着,有给予不同的参数 到的不同步骤的几种方法。

的一种方法是解析多行字符串如

""" C ca=5 cb=6 ... 
    A aa=1 ... 
    T ... 
""" 

另一种方法是采取的功能/功能名称/ PARAM类型的字典的列表,像

pipeline.run(["C", dict(ca=5, cb=6), lambda ..., "T", dict(ta=3) ]) 

第三种是分裂PARAMS “A__aa B__ba ...”的方式 sklearn.pipeline.Pipeline。 呢。 (这是面向机器学习,但你可以复制管道部件。)

这些都有相当明显的优点和缺点。

一个大型人才社区可以拿出一打原型 很快解决问题[管道]。
但是减少到二三个需要永远。

无论你采用何种方式, 提供记录所有参数运行的方式。

参见:
FilterPype
nipype

+0

有趣的做法,我会分支我的代码,并给这个旋转。 – Einar 2012-04-27 08:39:11

+0

@Einar,你会选择哪种方法? – denis 2012-04-27 11:14:15