2017-03-31 97 views
3

我有一串30'000字符串。我如何将这个切片的处理分成10个例程,从切片中取出3000个字符串,从中提取一些数据并推入一个新的切片?如何将数组处理分为goroutines?

所以,最后,我将有10个切片,每个切片有3000个处理结果。处理这个问题的模式是什么?

我看过this article,但不确定这些模式适用于我的情况。

+2

为什么不只是有10个goroutines,并让他们尽快处理字符串? – JimB

回答

2

使用通道,读取切片中的元素,使用Fan out分配负载并传递消息。然后,在goroutine中处理字符串,并在单个goroutine中收集结果(扇入)以避免互斥。

您可能需要设置最大并发并发例程数。

请记住,切片在写入时不是线程安全的。

有用的信息:

https://blog.golang.org/pipelines https://talks.golang.org/2012/concurrency.slide#1 https://blog.golang.org/advanced-go-concurrency-patterns https://talks.golang.org/2013/advconc.slide#1

+1

但是,切片是线程安全的,如果它们只用于只读。他的问题明确指出,他正在阅读这篇文章并写入另一篇文章。只要结果是在最后组合的单独片段,从输入读取就可以并发执行。 – RayfenWindspear

+0

谢谢@RayfenWindspear。我已经澄清, –

+2

雅,使用粉丝来收集结果,因此只有一个goroutine正在写入目标片。避免需要互斥。 – Kaedys

0

我在@JimB同意,为什么要限制外出例程。然而,既然这是你的问题,我可能会这样。如果你真的想让每个gorountine做3000个项目,那么创建一个2D切片可能会更容易。 [[3000项目],[3000项目],..]然后有1个去每个索引的例程进程在这个二维数组中。否则下面只是限制了gorountines至10 方法1 包主

import (
    "crypto/rand" 
    "fmt" 
    "log" 
    "sync" 
    "time" 
) 

var s []string 

// genetate some mock data 
func init() { 
    s = make([]string, 30000) 
    n := 5 
    for i := 0; i < 30000; i++ { 
     b := make([]byte, n) 
     if _, err := rand.Read(b); err != nil { 
      panic(err) 
     } 
     s[i] = fmt.Sprintf("%X", b) 
    } 
} 

func main() { 
    // set the number of workers 
    ch := make(chan string) 
    var mut sync.Mutex 
    counter := 0 

    // limit the number of gorountines to 10 
    for w := 0; w < 10; w++ { 
     go func(ch chan string, mut *sync.Mutex) { 
      for { 
       // get and update counter using mux to stop race condtions 
       mut.Lock() 
       i := counter 
       counter++ 
       mut.Unlock() 
       // break the loop 
       if counter > len(s) { 
        return 
       } 
       // get string 
       myString := s[i] 
       // to some work then pass to channel 
       ch <- myString 

      } 
     }(ch, &mut) 
    } 
    // adding time. If you play wiht the number of gorountines youll see how changing the number above can efficiency 
    t := time.Now() 
    for i := 0; i < len(s); i++ { 
     result := <-ch 
     log.Println(time.Since(t), result, i) 
    } 
} 

方法2 init函数是创建一个二维数组分块伸到10阵列,每个阵列包含3000种元素..如果解析您数据这种方式下面的逻辑需要很少的修改工作

package main 

import (
    "crypto/rand" 
    "fmt" 
    "log" 
    "sync" 
) 

var chunkedSlice [10][3000]string 

// genetate some mock data 
// 2d array, each chunk has 3000 items in it 
// there are 10 chunks, 1 go rountine per chunk 
func init() { 
    n := 5 
    for i := 0; i < 10; i++ { 
     for j := 0; j < 3000; j++ { 
      b := make([]byte, n) 
      if _, err := rand.Read(b); err != nil { 
       panic(err) 
      } 
      chunkedSlice[i][j] = fmt.Sprintf("%X", b) 
     } 
    } 
} 

func main() { 
    // channel to send parsed data to 

    ch := make(chan string) 
    var wg sync.WaitGroup 

    // 10 chunks 
    for _, chunk := range chunkedSlice { 
     wg.Add(1) 
     // if defining the 2d array e.g [10][3000]string, you need to pass it as a pointer to avoid stack error 
     go func(ch chan string, wg *sync.WaitGroup, chunk *[3000]string) { 
      defer wg.Done() 
      for i := 0; i < len(chunk); i++ { 
       str := chunk[i] 
       // fmt.Println(str) 
       // parse the data (emulating) 
       parsedData := str 
       // send parsed data to the channel 
       ch <- parsedData 
      } 
     }(ch, &wg, &chunk) 
    } 
    // wait for all the routines to finish and close the channel 
    go func() { 
     wg.Wait() 
     close(ch) 
    }() 

    var counter int // adding to check that the right number of items was parsed 
    // get the data from the channel 
    for res := range ch { 
     log.Println(res, counter) 
     counter++ 
    } 
} 
+0

似乎只是简单地将输入列表转换为'w'子列表并处理每个列表,而不是使用goroutine。或者除扇入输出通道外还使用扇出输入通道。 – Kaedys

+0

我同意没有想到它 – reticentroot