2017-06-19 36 views
4

我使用TPL数据流来创建它们通过输出到输出bufferBlock是否有一种惯用的方式来路由在TPL数据流图中的TransformBlock中失败的元素?

inputQueue = new BufferBlock<InputPacket>; 
processQueue = new TransformBlock <InputPacket, OutputPacket>; 
outputQueue = new BufferBlock<OutputPacket>; 

inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true }); 
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true }); 

是否有路径元件,其失败的惯用方式TransformBlock处理的输入元件的bufferBlock?

InputPacket要素齐全的加工,动作与 processQueue关联将返回OutputPacket将被路由到outputQueue

如果有processQueue相关的动作调用一个不可靠webservice, 然后处理一些InputPacket元素将超时, ,我想重试这些元素x次。但我不想立即尝试它们,我想将它们放回到输入队列中。

我希望能够路由InputPacket元件,其 超时回到inputQueue直到他们失败x次,然后以failureQueue

BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>; 

使用LinkTo谓词由于涉及两种不同类型而变得复杂:

InputPacket OutputPacket 

我看起来像我可以处理这种改变:

processQueue = new TransformBlock <InputPacket, ParentPacketType>; 

,然后写谓词基于数据包的类型。

通过在inputElement输出存储作为InputPacket成员,

但既不似乎良好的方式来实现这一点。

+1

这个项目包含LinkSubTypeTo这似乎是为了这个目的:https://github.com/gridsum/DataflowEx –

回答

3

首先,我认为你的术语很混乱,你应该坚持使用TPL Dataflow。数据流图不是由队列组成的,它由组成。并且这些块不处理数据元素处理

现在,当TPL Dataflow不能为您提供您想要的块时,一种解决方案是自己构建该块,并使用所提供的块。这方面的一个简单的版本看起来是这样的:

public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform, int retryCount, 
    ITargetBlock<(TInput, Exception)> failureBlock) 
{ 
    var failedInputs = new Dictionary<TInput, int>(); 

    TransformManyBlock<TInput, TOutput> resultBlock = null; 

    resultBlock = new TransformManyBlock<TInput, TOutput>(
     async input => 
     { 
      try 
      { 
       return new[] { transform(input) }; 
      } 
      catch (Exception exception) 
      { 
       failedInputs.TryGetValue(input, out int count); 

       if (count < retryCount) 
       { 
        failedInputs[input] = count + 1; 
        // ignoring the returned Task, to avoid deadlock 
        _ = resultBlock.SendAsync(input); 
       } 
       else 
       { 
        failedInputs.Remove(input); 
        await failureBlock.SendAsync((input, exception)); 
       } 

       return Array.Empty<TOutput>(); 
      } 
     }); 

    return resultBlock; 
} 

假设我提出:

  • 您可以使用C#7.0。如果不是,我使用的功能很容易更换。
  • 可以忽略除最后一个异常之外的所有内容。否则,Dictionary将不得不存储所有先前的例外,然后将它们发送到failureBlock
  • 将失败的数据元素发送回同一个块可以。如果没有,该方法将不得不采取一个参数,并使用它。
  • 该块不需要支持并行性。如果是这样,你将不得不使代码线程安全(你可能会开始使用ConcurrentDictionary而不是Dictionary)。
  • 输入数据元素可以存储在字典中(请阅读:它们的GetHashCode行为正确)并且不会有重复的输入。否则,你将不得不设计一些其他机制来重新计数。
+0

我要对这个办法的工作,它是否适合我,我会回来带剔,欢呼复合 –

相关问题