2015-04-04 142 views
0

我可能错过了某些东西,或不理解Go如何处理并发(或者我的并发知识),我已经设计了一些代码来理解多个生产者/消费者。并发与多个生产者/多个消费者

这是代码:

package main 

import (
    "fmt" 
    "time" 
    // "math/rand" 
    "sync" 
) 

var seq uint64 = 0 
var generatorChan chan uint64 
var requestChan chan uint64 

func makeTimestamp() int64 { 
    return time.Now().UnixNano()/int64(time.Millisecond) 
} 

func generateStuff(genId int) { 
    var crap uint64 
    for { 
     crap = <-requestChan 
     // <- requestChan 
     seq = seq+1 
     fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp()) 
     generatorChan <- uint64(seq) 
    } 
} 

func concurrentPrint(id int, work *sync.WaitGroup) { 
    defer work.Done() 

    for i := 0; i < 5; i++ { 
     requestChan<-uint64(id) 
     fmt.Println("Conc", id, ": ", <-generatorChan) 
    } 
} 

func main() { 
    generatorChan = make(chan uint64) 
    requestChan = make(chan uint64) 
    var wg sync.WaitGroup 
    for i := 0; i < 20; i++ { 
     go generateStuff(i) 
    } 
    maximumWorker := 200 
    wg.Add(maximumWorker) 
    for i := 0; i < maximumWorker; i++ { 
     go concurrentPrint(i, &wg) 
    } 
    wg.Wait() 
} 

当从1运行它打印(主要是按顺序)的所有数字至1000(200名消费者得到一个数每5次)。 我本来预计一些消费者会打印完全相同的号码,但看起来请求代码正在阻止这种情况,即使有20个goroutines服务于generateStuff,它们通过增加一个全局变量。

我在Go或Concurrency中遇到了什么问题?

我本来预料到类似于类似的两个去程序的情况,generateStuff会一起醒来,同时增加seq,因此有两个消费者打印相同数字两次。

EDIT代码上playgolang:http://play.golang.org/p/eRzNXjdxtZ

+0

注意,你有获得潜在的数据赛跑到全局'seq'变量(除了未缓冲的'requestChan'可能会使它们分开)。 – 2015-04-04 20:25:20

+0

是的,这是我所期望的戴夫,一些非确定性的行为给我不是所有的数字从1到1000,但有些不同。 在这个例子中,chans被缓冲了http://play.golang.org/p/tq7-6Bc0hL,仍然得到相同的结果。 – Marlon 2015-04-04 20:33:19

+0

你用'GOMAXPROCS'> 1运行了吗? (也许使用'-race') – 2015-04-04 20:34:27

回答

1

您有多个工人,可以在同一时间都运行和所有的尝试,并在同一时间的请求。由于requestChan没有缓冲,所以它们都阻止等待阅读器同步并接受他们的请求。

您有多个生成器,它们将通过requestChan与请求者同步,生成一个结果,然后阻止未缓冲的generatorChan,直到工作人员读取结果为止。注意它可能是一个不同的工作者。

没有额外的同步,所以其他一切都是非确定性的。

  • 一个生成器可以发出所有的请求。
  • 发电机可以抓住一个请求,并在任何其他发电机碰巧有机会运行之前通过递增seq 。只有一个处理器,这可能甚至是可能的。
  • 所有的生成器都可以抓取请求,并且最终都希望在同一时刻增加seq,导致各种问题。
  • 工作人员可以从他们碰巧发送给或来自完全不同的发电机的同一发电机获得响应。

通常,如果不添加同步来强制这些行为之一,则无法确保这些行为中的任何一个发生。

请注意,对于数据竞争来说,这本身就是另一个非确定性事件。有可能获得任意值,程序崩溃等。假设在竞争条件下,这个值可能只是被一个或一些这样的相对无害的结果所关闭。

对于试验来说,最好的办法是振作起来GOMAXPROCS。无论是通过环境变量(例如env GOMAXPROCS=16 go run foo.goenv GOMAXPROCS=16 ./foo之后的go build)或通过从您的程序中调用runtime.GOMAXPROCS(16)。默认值为1,这意味着数据竞赛或其他“奇怪”行为可能被隐藏。

您还可以通过在各个位置添加对runtime.Goschedtime.Sleep的呼叫来影响一些事情。

如果您使用比赛检测器(例如go run -race foo.googo build -race),还可以看到数据竞赛。程序不仅应该在退出时显示“找到1个数据竞赛”,而且应该在首次检测到竞赛时还会用堆栈跟踪转储出大量细节。

这里是你的代码进行实验一种“清理”版本:

package main 

import (
    "log" 
    "sync" 
    "sync/atomic" 
) 

var seq uint64 = 0 
var generatorChan = make(chan uint64) 
var requestChan = make(chan uint64) 

func generator(genID int) { 
    for reqID := range requestChan { 
     // If you want to see a data race: 
     //seq = seq + 1 
     // Else: 
     s := atomic.AddUint64(&seq, 1) 
     log.Printf("Gen: %2d, from %3d", genID, reqID) 
     generatorChan <- s 
    } 
} 

func worker(id int, work *sync.WaitGroup) { 
    defer work.Done() 

    for i := 0; i < 5; i++ { 
     requestChan <- uint64(id) 
     log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan) 
    } 
} 

func main() { 
    log.SetFlags(log.Lmicroseconds) 
    const (
     numGen = 20 
     numWorker = 200 
    ) 
    var wg sync.WaitGroup 
    for i := 0; i < numGen; i++ { 
     go generator(i) 
    } 
    wg.Add(numWorker) 
    for i := 0; i < numWorker; i++ { 
     go worker(i, &wg) 
    } 
    wg.Wait() 
    close(requestChan) 
} 

Playground(但要注意,在操场上的时间戳不会是有用的,并呼吁runtime.MAXPROCS可能不会做任何事情)。进一步注意,操作缓存的结果,以便重新运行完全相同的程序将始终显示相同的输出,您需要做一些小的更改或者只是在自己的机器上运行它。

很大程度上变化小象分流下来的发电机,使用logfmt因为前者使得并发保证,消除数据争,使得输出看起来更好,等

+0

非常感谢!清理代码) – Marlon 2015-04-05 08:07:18

0

Channel types

信道提供了一种机制用于同时执行功能 通过发送和接收一个指定的元素 类型的值进行通信。未初始化通道的值为零。

可以使用内置的 函数make,它接受信道类型和作为参数的可选容量 制成一个新的,初始化的信道值:

make(chan int, 100) 

容量,中元素的个数,在 通道中设置缓冲区的大小。如果容量为零或不存在,则只有当发送方和接收方都准备好时,通道才是无缓冲的,并且通信成功。否则,通道被缓冲,并且如果缓冲区不是满的,则通信成功而不会阻塞 (发送)或不为空(接收)。一个零通道永远不会准备好通讯。

您正在通过使用无缓冲通道来节制通道通信。

例如,

generatorChan = make(chan uint64) 
requestChan = make(chan uint64) 
+0

uhmm所以如果我让我的陈缓冲我应该期望不同的行为,但事实并非如此。 (http://play.golang.org/p/tq7-6Bc0hL) 仍然没有得到它对不起:( – Marlon 2015-04-04 20:31:03

+0

@peterSO除了多个工作人员可能都尝试和发送'requestChan'在同一时间的价值和那么应该有可能多个生成器可以在“几乎同时”接收这些个别请求(也就是说,可能有多个生成器在'seq'上竞争,我相信这是个问题) – 2015-04-04 20:32:13

相关问题