我一直在构建一个服务,使用Queue<string>
对象处理文件来管理项目。检测DataFlow中的重复项目
public partial class BasicQueueService : ServiceBase
{
private readonly EventWaitHandle completeHandle =
new EventWaitHandle(false, EventResetMode.ManualReset, "ThreadCompleters");
public BasicQueueService()
{
QueueManager = new Queue<string>();
}
public bool Stopping { get; set; }
private Queue<string> QueueManager { get; }
protected override void OnStart(string[] args)
{
Stopping = false;
ProcessFiles();
}
protected override void OnStop()
{
Stopping = true;
}
private void ProcessFiles()
{
while (!Stopping)
{
var count = QueueManager.Count;
for (var i = 0; i < count; i++)
{
//Check the Stopping Variable again.
if (Stopping) break;
var fileName = QueueManager.Dequeue();
if (string.IsNullOrWhiteSpace(fileName) || !File.Exists(fileName))
continue;
Console.WriteLine($"Processing {fileName}");
Task.Run(() =>
{
DoWork(fileName);
})
.ContinueWith(ThreadComplete);
}
if (Stopping) continue;
Console.WriteLine("Waiting for thread to finish, or 1 minute.");
completeHandle.WaitOne(new TimeSpan(0, 0, 15));
completeHandle.Reset();
}
}
partial void DoWork(string fileName);
private void ThreadComplete(Task task)
{
completeHandle.Set();
}
public void AddToQueue(string file)
{
//Called by FileWatcher/Manual classes, not included for brevity.
lock (QueueManager)
{
if (QueueManager.Contains(file)) return;
QueueManager.Enqueue(file);
}
}
}
虽然研究如何限制这个线程数(我已经尝试了手工课和递增int
,但有一个问题,即它不会在我的代码适当减量),我碰到TPL DataFlow,这似乎是它更适合我想要实现 - 具体而言,它可以让我,让框架处理线程/排队等
这是我现在的服务:
public partial class BasicDataFlowService : ServiceBase
{
private readonly ActionBlock<string> workerBlock;
public BasicDataFlowService()
{
workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 32
});
}
public bool Stopping { get; set; }
protected override void OnStart(string[] args)
{
Stopping = false;
}
protected override void OnStop()
{
Stopping = true;
}
partial void DoWork(string fileName);
private void AddToDataFlow(string file)
{
workerBlock.Post(file);
}
}
这很好。但是,我想确保只有一次将文件添加到TPL DataFlow
。随着Queue
,我可以检查使用.Contains()
。有没有一种机制可以用于TPL DataFlow
?
无论是消费和提交文件都有责任不要发布两次。如果您从目录中读取文件,您可以标记它们,或者将路径缓存为@VMAtm建议的值。但是,如果用户或其他客户提交他们,您需要将流程视为一项工作。每个文件代表具有单一结果的单个作业。 – JSteward