2017-05-25 81 views
1

使用GOCQL(Golang,Cassandra),我完成128个请求,然后一切都挂起。我相信getTicksForCassandraKey()函数我正确地发布查询,但不知道。 GOCQL只支持多达128个并发查询,所以我一定在做错事。所有查询都是读取。gocql阻止太多的并发读取请求(golang,Cassandra)

主要代码:

inboundChannel := make(chan []bson.M, 30) 
maxGoRoutinesCount := 30 
chunkSize := int(math.Floor(float64(len(cassandraKeys))/float64(maxGoRoutinesCount))) 
log.Println("Chunk size will be: ", chunkSize) 
log.Println("Cassandra Keys length: ", cassandraKeys) 
idx := 0 
for idx < len(cassandraKeys) { 
    log.Println("idx: ", idx) 
    chunkOfKeys := cassandraKeys[idx:(idx + chunkSize)] 
    idx += chunkSize 
    go func(keys []string) { 
     log.Println("Received analysisKey on outbound channel. About to query this many keys: ", len(keys)) 
     //Cassandra session can handle up to 128 concurrent queries 
     for _, c := range keys { 
      processedTicks, err := getTicksForCassandraKey(*session, c, startTime) 
      if err != nil { 
       log.Println("Error returning.") 
       return 
      } 
      log.Println("Finished query. About to post to inboundChannel for key: ", c) 
      inboundChannel <- processedTicks 
     } 
    }(chunkOfKeys) 
} 

processedIndex := 0 
for processedTicks := range inboundChannel { 
    ticks = append(ticks, processedTicks...) 
    log.Println("Got processed ticks from inboundChannel: ", processedIndex) 
    processedIndex += 1 
} 
log.Println("End of function.") 

代码以getTicksForCassandraKey是:

func getTicksForCassandraKey(cassandraSession gocql.Session, cassandraAnalysisKey string, startTime time.Time) (ticks []bson.M, err error) { 
    log.Println("getTicksForCassandraKey: ", cassandraAnalysisKey) 
    cassandraQuery := "select * from analysisdata where analysis_key = '" + cassandraAnalysisKey + "' and time > ? ALLOW FILTERING;" 
    q := cassandraSession.Query(cassandraQuery, startTime) 
    iter := q.Iter() 
    var rawData string 
    var rawAnalysisKey string 
    var rawTime time.Time 
    for iter.Scan(&rawAnalysisKey, &rawTime, &rawData) { 
     processedAlgoTick, processingErr := processAlgoTick(rawAnalysisKey, rawTime, rawData) 
     if processingErr != nil { 
      err = processingErr 
      return 
     } 
     ticks = append(ticks, processedAlgoTick) 
    } 
    iterCloseErr := iter.Close() 
    q.Release() 
    log.Println("Closed iter for analysis key: ", cassandraAnalysisKey) 
    if iterCloseErr != nil { 
     log.Println("Cassandra Iterator.Close() Error:", iterCloseErr) 
    } 
    return 
} 

回答

0

的inboundChannel在主代码被阻断。我把它放到一个去例程中,并使用sync.WaitGroup()来解决它。