2017-07-28 64 views
4

这个代码是基于Coroutines guide example: Fan-out桥通道序列

val inputProducer = produce<String>(CommonPool) { 
    (0..inputArray.size).forEach { 
     send(inputArray[it]) 
    } 
} 

val resultChannel = Channel<Result>(10) 

repeat(threadCount) { 
    launch(CommonPool) { 
     inputProducer.consumeEach { 
      resultChannel.send(getResultFromData(it)) 
     } 
    } 
} 

什么是创建一个Sequence<Result>,将提供结果的正确方法?

回答

3

您可以从ReceiveChannel获得信道.iterator(),然后换行通道迭代器为Sequence<T>,实现其正常Iterator<T>是等待对每个请求的结果块:

fun <T> ReceiveChannel<T>.asSequence(context: CoroutineContext) = 
    Sequence { 
     val iterator = iterator() 
     object : AbstractIterator<T>() { 
      override fun computeNext() = runBlocking(context) { 
       if (!iterator.hasNext()) 
        done() else 
        setNext(iterator.next()) 
      } 
     } 
    } 

val resultSequence = resultChannel.asSequence(CommonPool) 
+0

ChannelIterator不延长迭代 – atok

+0

@阿托克啊,对。修复了答案。 – hotkey