2017-03-16 62 views
0

将项目发布到TPL DataFlow时,是否有任何机制可以允许延迟发布?延迟发布到DataFlow

public partial class BasicDataFlowService 
{ 
    private readonly ActionBlock<string> workerBlock; 

    public BasicDataFlowService() 
    { 
     workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions() 
     { 
      MaxDegreeOfParallelism = 32 
     }); 
    } 

    partial void DoWork(string fileName); 

    private void AddToDataFlow(string file) 
    { 
     workerBlock.Post(file); 
    } 
} 

AddToDataFlow,我希望能够指定一个延迟的项目被处理之前(例如,如果我们决定我们要推迟30秒的处理)。

我的确考虑使用TransFormBlocknew System.Threading.ManualResetEvent(false).WaitOne(1000);,例如,

var requeueBlock = new TransformBlock<string, string>(file => 
{ 
    new System.Threading.ManualResetEvent(false).WaitOne(1000); 
    return file; 
}); 

requeueBlock.LinkTo(workerBlock); 

但是,这似乎是消耗了一个不必要的线程,可以被链中的其他块使用。

回答

0

首先,您需要将ManualResetEvent作为单例存储,否则所有线程都会得到自己的对象来等待,并且您的方法不起作用。

其次,如果您需要在流水线中的一个AppDomain内执行同步,请考虑ManualResetEventSlim版本而不是重型ManualResetEvent

如果你想重复使用你的机器的核心,而没有无用的等待,你应该看看SpinWait轻量级结构。您可能会发现Joseph Albahari' article有用在这种情况下:

// singleton variable 
bool _proceed; 

var requeueBlock = new TransformBlock<string, string>(file => 
{ 
    var spinWait = new SpinWait(); 
    while (!_proceed) 
    { 
     // ensure we have the latest _proceed value 
     Thread.MemoryBarrier(); 
     // try to spin for a while 
     // after some spins, yield to another thread 
     spinWait.SpinOnce(); 
    } 
    return file; 
}); 

SpinWait内部决定,如何一代产量:与Sleep(0)Sleep(1)Yield方法调用,所以它对于你的情况相当有效。

0

要在将值发布到workerBlock之前添加延迟,只需插入延迟并在发布该值之前等待它。如果您的workerBlock具有有限容量,您可以await SendAsync。几个选项来完成目标:

private async Task AddToDataflow(string file, TimeSpan delay) { 
    await Task.Delay(delay); 
    await workerBlock.SendAsync(file); 
} 

private async Task AddToDataflow(string file) { 
    var delay = TimeSpan.FromSeconds(30); 
    await Task.Delay(delay); 
    await workerBlock.SendAsync(file); 
} 

private async void AddToDataflow(string file) { 
    var delay = TimeSpan.FromSeconds(30); 
    await Task.Delay(delay); 
    workerBlock.Post(file); 
}