2017-06-08 57 views
1

我想知道是否有一个简洁的方式来替代IDataflowBlock.Completion来取代ReceiveAsync或类似的方法使用取消令牌从BufferBlock或其他IDataflowBlock消耗。C#使用DataflowBlock.Completion取消消费者任务而不是CancellationToken

IDataflowBlock.ReceiveAsync<T>(TimeSpan, CancellationToken) 

如果InputQueueBufferBlock

BufferBlock<String> InputQueue 

for (int i = 0; i < 26; i++)    
{ 
    await InputQueue.SendAsync(((char)(97 + i)).ToString()); 
} 

如果InputQueue.Complete();被调用,那么当队列被清空,IDataflowBlock.Completion将变为状态RanToCompletion, 可与IDataflowBlock.Completion.IsCompleted进行检查。

如果将多个线程从队列中,这可能发生在InputQueue.ReceiveAsync服用,有一个整洁的替代处理InputQueue不是完成:

try 
{ 
    String parcel = await InputQueue.ReceiveAsync(timeSpan); 
} 
catch(InvalidOperationException x) 
{ 

} 
+0

你的样品try-catch不是你处理完成的方式。你应该在调用'.Complete()'后等待block.Completion'。实际上正在做什么?此外,除非您有特殊的理由需要使用'RecieveAsync',否则您应该更喜欢使用'LinkTo'方法创建的块之间的链接。 [Stephen Cleary - 数据流介绍](https://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html) – JSteward

+0

我有许多生产者任务将数据包放在其中的代码一个缓冲区块和许多其他任务正在从缓冲区块读取,读者任务有一个令牌,通知他们何时没有更多的生产者加载队列。鉴于该逻辑类似于isCompleted逻辑,我正在寻求消除对令牌的需求。我会考虑链接到 –

+0

当然,在'TPL-Dataflow'中肯定会看到'LinkTo',你的消费任务将根据你的需要变成'ActionBlock'或'TransformBlock'。一旦他们被链接,你只需传播完成来关闭你的管道。 – JSteward

回答

0

cancel a Dataflow Block最简单的方法是将令牌提供阻止的构造,像这样:

new ExecutionDataflowBlockOptions 
{ 
    CancellationToken = cancellationSource.Token 
}); 

CancellationToken is defined in Dataflow​Block​Options class,所以即使BufferBlock可能被取消。

为什么你自己实现Receive逻辑? PropagateCompletionlinking your blocks有没有限制吗?例如,如果你的代码看起来是这样的:

internal void HandleMessage() 
{ 
    try 
    { 
     var parcel = await InputQueue.ReceiveAsync(timeSpan); 
     // handle parsel 
    } 
    catch(InvalidOperationException x) 
    { 
    } 
} 

然后,你可以简单地使用ActionBlock这样的:

var InputQueue = new BufferBlock<string>(); 
var Handler = new ActionBlock<string>(parcel => 
{ 
    // handle parsel 
}); 
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; 
InputQueue.LinkTo(Handler, linkOptions); 

// now after you call Complete method for InputQueue the completion will be propagated to your Handler block: 
for (int i = 0; i < 26; i++)    
{ 
    await InputQueue.SendAsync(((char)(97 + i)).ToString()); 
} 
InputQueue.Complete(); 
await Handler.Completion; 

还要注意的是,如果你需要与UI一些互动,你可以使用你的最后区块为IObservableRx.Net库。

+0

没问题,ActionBlock是我的答案,我让消费者​​直接从缓冲块中取出,将其更改为:生产者反馈缓冲区块,缓冲区块链接到动作块/ s。我将试验maxDegreeOfParallelism和限制消费者的有限容量 –

相关问题