2017-08-17 63 views
0

我有以下功能,旋转关闭走程序聚合来自去例程多个结果到一个单一的阵列

func (r *Runner) Execute() { 
    var wg sync.WaitGroup 
    wg.Add(len(r.pipelines)) 
    for _, p := range r.pipelines { 
     go executePipeline(p, &wg) 
    } 

    wg.Wait() 

    errs := ....//contains list of errors reported by any/all go routines 

} 

我想可能有某种方式与渠道一定量,但我不能似乎弄明白了。要做到这一点

+0

创建大小为'LEN(r.pipelines)一片'让每一个工人写信给他们的相应的指标。 – zerkms

+2

有一种模式可将某些通道的结果组合成一个名为Fan In的模式。您可以在该模式中使用该方法(不一定是模式本身)。 –

回答

0

的一种方法是使用互斥的,如果你可以让executePipeline retuen错误:

// ... 
for _, p := range r.pipelines { 
    go func(p pipelineType) { 
     if err := executePipeline(p, &wg); err != nil { 
      mu.Lock() 
      errs = append(errs, err) 
      mu.UnLock() 
     } 
    }(p) 
} 

要使用的频道,你可以有错误单独的goroutine listning:

errCh := make(chan error) 

go func() { 
    for e := range errCh { 
     errs = append(errs, e) 
    } 
} 

和在Execute功能中,进行以下更改:

// ... 
wg.Add(len(r.pipelines)) 
for _, p := range r.pipelines { 
    go func(p pipelineType) { 
     if err := executePipeline(p, &wg); err != nil { 
      errCh <- err 
     } 
    }(p) 
} 

wg.Wait() 
close(errCh) 

您如果goroutines的数量不高,可以总是使用上面列出的@zerkms方法。

而不是从executePipleline返回错误并使用匿名函数包装,您可以始终在函数本身进行上述更改。

0

您可以使用渠道@Kaveh Shahbazian建议:

func (r *Runner) Execute() { 
    pipelineChan := makePipeline(r.pipelines) 

    for cnt := 0; cnt < len(r.pipelines); cnt++{ 
     //recieve from channel 
     p := <- pipelineChan 
     //do something with the result 
    } 
} 

func makePipeline(pipelines []pipelineType) <-chan pipelineType{ 
    pipelineChan := make(chan pipelineType) 

    go func(){ 
     for _, p := range pipelines { 
      go func(p pipelineType){ 
       pipelineChan <- executePipeline(p) 
      }(p) 
     } 
    }() 
    return pipelineChan 
} 

请看下面的例子:https://gist.github.com/steven-ferrer/9b2eeac3eed3f7667e8976f399d0b8ad

相关问题