2017-07-28 66 views
0

我创建一个程序创建随机bson.M文件,并将它们插入到数据库中。 主goroutine生成文档,并将它们推送到缓冲通道。同时,两个goroutine从通道获取文档并将其插入到数据库中。内存池和缓冲通道与多个goroutines

这个过程需要大量的内存,并把垃圾colelctor压力太大了,所以我想实现一个内存池,以限制分配

这里的数字是多少我到目前为止有:

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
    "time" 

    "gopkg.in/mgo.v2/bson" 
) 

type List struct { 
    L []bson.M 
} 

func main() { 
    var rndSrc = rand.NewSource(time.Now().UnixNano()) 

    pool := sync.Pool{ 
     New: func() interface{} { 
      l := make([]bson.M, 1000) 
      for i, _ := range l { 
       m := bson.M{} 
       l[i] = m 
      } 
      return &List{L: l} 
     }, 
    } 
    // buffered channel to store generated bson.M docs 
    var record = make(chan List, 3) 
    // start worker to insert docs in database 
    for i := 0; i < 2; i++ { 
     go func() { 
      for r := range record { 
       fmt.Printf("first: %v\n", r.L[0]) 
       // do the insert ect 
      } 
     }() 
    } 
    // feed the channel 
    for i := 0; i < 100; i++ { 
     // get an object from the pool instead of creating a new one 
     list := pool.Get().(*List) 
     // re generate the documents 
     for j, _ := range list.L { 
      list.L[j]["key1"] = rndSrc.Int63() 
     } 
     // push the docs to the channel, and return them to the pool 
     record <- *list 
     pool.Put(list) 
    } 
} 

但它看起来像一个List被再生之前使用4次:

> go run test.go 
first: map[key1:943279487605002381 key2:4444061964749643436] 
first: map[key1:943279487605002381 key2:4444061964749643436] 
first: map[key1:943279487605002381 key2:4444061964749643436] 
first: map[key1:943279487605002381 key2:4444061964749643436] 
first: map[key1:8767993090152084935 key2:8807650676784718781] 
... 

为什么不在列表中,每次再生?我怎样才能解决这个问题 ?

+2

您的程序存在缺陷。产品应该从游泳池获得物品,但消费者应该把它们放回原处。 –

回答

3

问题是你已经创建了一个缓冲通道var record = make(chan List, 3)。因此,此代码:

record <- *list 
pool.Put(list) 

可能会立即返回,并且条目将在其消耗之前放回池中。因此,在消费者有机会使用它之前,底层切片可能会在另一个循环迭代中被修改。尽管您要将List作为值对象发送,但请记住[]bson.M是指向已分配数组的指针,并且在发送新值List时仍将指向相同的内存。因此你为什么看到重复的输出。

要修正,修改您的通道来发送列表指针make(chan *List, 3),改变你的消费者把项目回一次吃完池,如:

for r := range record { 
    fmt.Printf("first: %v\n", r.L[0]) 
    // do the insert etc 
    pool.Put(r) // Even if error occurs 
} 

你的制作人应该再送到指针与pool.Put删除,即

record <- list