2017-05-29 155 views
2

我正在通过制作1000名工作人员的工作区来播放频道。目前,我收到以下错误:Workerpool上的频道死锁

fatal error: all goroutines are asleep - deadlock! 

这里是我的代码:

package main 

import "fmt" 
import "time" 


func worker(id int, jobs <-chan int, results chan<- int) { 
    for j := range jobs { 
     fmt.Println("worker", id, "started job", j) 
     time.Sleep(time.Second) 
     fmt.Println("worker", id, "finished job", j) 
     results <- j * 2 
    } 
} 

func main() { 
    jobs := make(chan int, 100) 
    results := make(chan int, 100) 

    for w := 1; w <= 1000; w++ { 
     go worker(w, jobs, results) 
    } 

    for j := 1; j < 1000000; j++ { 
     jobs <- j 
    } 
    close(jobs) 
    fmt.Println("==========CLOSED==============") 

    for i:=0;i<len(results);i++ { 
     <-results 
    } 
} 

这究竟是为什么?我还是新来的,我希望能够理解这一点。

回答

1

问题是您的渠道正在填满。在读取任何结果之前,main()例程会尝试将所有作业放入jobs通道。但results频道只有100个结果的空间才能阻止频道的写入,因此所有工作人员最终都会阻止在此频道中等待空间 - 因为main()尚未开始从results开始读取,所以永远不会出现空间。

要快速解决此问题,您可以使jobs大到足以容纳所有作业,以便main()函数可以继续读取阶段;或者您可以让results大到足以保存所有结果,这样工作人员就可以输出结果而不会阻塞。

一个更好的方法是让另一个够程,填补了jobs队列,所以main()可以直接去读取结果:

func main() { 
    jobs := make(chan int, 100) 
    results := make(chan int, 100) 

    for w := 1; w <= 1000; w++ { 
     go worker(w, jobs, results) 
    } 

    go func() { 
     for j := 1; j < 1000000; j++ { 
      jobs <- j 
     } 
     close(jobs) 
     fmt.Println("==========CLOSED==============") 
    } 

    for i := 1; i < 1000000; i++ { 
     <-results 
    } 
} 

注意,我不得不最终for循环更改为固定数量的迭代,否则它可能会在所有结果被读取之前终止。

+0

目前两个通道都在100缓冲为何仍发生,如果我删除缓冲区? – rhillhouse

+0

如果您未指定通道大小,则会得到0的缓冲区大小,这意味着写入通道的通道将阻塞,直到读取器可用。您无法通过设计创建无限大小的频道(例如,这可能会导致服务器内存使用量不受限制)。 – Thomas

1

以下代码:

for j := 1; j < 1000000; j++ { 
     jobs <- j 
    } 

应在一个单独的goroutine运行,因为所有的工人将阻塞等待主gorourine接收结果信道上,而主够程卡在循环。

2

虽然托马斯的回答基本上是正确的,我张贴我的版本,这是IMO最好去也无缓冲通道工程:

func main() { 
    jobs := make(chan int) 
    results := make(chan int) 

    var wg sync.WaitGroup 

    // you could init the WaitGroup's count here with one call but this is error 
    // prone - if you change the loop's size you could forget to change the 
    // WG's count. So call wg.Add in loop 
    //wg.Add(1000) 
    for w := 1; w <= 1000; w++ { 
     wg.Add(1) 
     go func() { 
      worker(w, jobs, results) 
      defer wg.Done() 
     }() 
    } 

    go func() { 
     for j := 1; j < 2000; j++ { 
      jobs <- j 
     } 
     close(jobs) 
     fmt.Println("==========CLOSED==============") 
    }() 

    // in this gorutine we wait until all "producer" routines are done 
    // then close the results channel so that the consumer loop stops 
    go func() { 
     wg.Wait() 
     close(results) 
    }() 

    for i := range results { 
     fmt.Print(i, " ") 
    } 
    fmt.Println("==========DONE==============") 
}