2017-03-16 80 views
1

我一直在构建一个服务,使用Queue<string>对象处理文件来管理项目。检测DataFlow中的重复项目

public partial class BasicQueueService : ServiceBase 
{ 
    private readonly EventWaitHandle completeHandle = 
     new EventWaitHandle(false, EventResetMode.ManualReset, "ThreadCompleters"); 

    public BasicQueueService() 
    { 
     QueueManager = new Queue<string>(); 
    } 

    public bool Stopping { get; set; } 

    private Queue<string> QueueManager { get; } 

    protected override void OnStart(string[] args) 
    { 
     Stopping = false; 

     ProcessFiles(); 
    } 

    protected override void OnStop() 
    { 
     Stopping = true; 
    } 

    private void ProcessFiles() 
    { 
     while (!Stopping) 
     { 
      var count = QueueManager.Count; 
      for (var i = 0; i < count; i++) 
      { 
       //Check the Stopping Variable again. 
       if (Stopping) break; 

       var fileName = QueueManager.Dequeue(); 
       if (string.IsNullOrWhiteSpace(fileName) || !File.Exists(fileName)) 
         continue; 

       Console.WriteLine($"Processing {fileName}"); 

       Task.Run(() => 
        { 
         DoWork(fileName); 
        }) 
        .ContinueWith(ThreadComplete); 
      } 
      if (Stopping) continue; 

      Console.WriteLine("Waiting for thread to finish, or 1 minute."); 
      completeHandle.WaitOne(new TimeSpan(0, 0, 15)); 
      completeHandle.Reset(); 
     } 
    } 

    partial void DoWork(string fileName); 

    private void ThreadComplete(Task task) 
    { 
     completeHandle.Set(); 
    } 

    public void AddToQueue(string file) 
    { 
     //Called by FileWatcher/Manual classes, not included for brevity. 
     lock (QueueManager) 
     { 
      if (QueueManager.Contains(file)) return; 

      QueueManager.Enqueue(file); 
     } 
    } 
} 

虽然研究如何限制这个线程数(我已经尝试了手工课和递增int,但有一个问题,即它不会在我的代码适当减量),我碰到TPL DataFlow,这似乎是它更适合我想要实现 - 具体而言,它可以让我,让框架处理线程/排队等

这是我现在的服务:

public partial class BasicDataFlowService : ServiceBase 
{ 
    private readonly ActionBlock<string> workerBlock; 

    public BasicDataFlowService() 
    { 
     workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions() 
     { 
      MaxDegreeOfParallelism = 32 
     }); 
    } 

    public bool Stopping { get; set; } 

    protected override void OnStart(string[] args) 
    { 
     Stopping = false; 
    } 

    protected override void OnStop() 
    { 
     Stopping = true; 
    } 

    partial void DoWork(string fileName); 

    private void AddToDataFlow(string file) 
    { 
     workerBlock.Post(file); 
    } 
} 

这很好。但是,我想确保只有一次将文件添加到TPL DataFlow。随着Queue,我可以检查使用.Contains()。有没有一种机制可以用于TPL DataFlow

+0

无论是消费和提交文件都有责任不要发布两次。如果您从目录中读取文件,您可以标记它们,或者将路径缓存为@VMAtm建议的值。但是,如果用户或其他客户提交他们,您需要将流程视为一项工作。每个文件代表具有单一结果的单个作业。 – JSteward

回答

0

你可以在DoWork之内检查它。

你必须保存在Hash已经工作的项目和检查当前文件名不存在散列。

1

只有当文件在短时间内进入您的服务两次,您的解决方案Queue才有效。如果它再次出现,比如几个小时,队列就不会包含它,因为它从那里开始。

如果此解决方案的预期,那么您可以使用MemoryCache来存储文件路径被已经处理,像这样:

using System.Runtime.Caching; 

private static object _lock = new object(); 

private void AddToDataFlow(string file) 
{ 
    lock (_lock) 
    { 
     if (MemoryCache.Default.Contains(file)) 
     { 
      return; 
     } 

     // no matter what to put into the cache 
     MemoryCache.Default[file] = true; 
    // we can now exit the lock 
    } 

    workerBlock.Post(file); 
} 

但是,如果你的应用程序必须运行很长时间(其服务的目的要做),你最终会耗尽内存。在这种情况下,您可能需要将文件路径存储在数据库或其他东西中,所以即使在重新启动服务之后,代码也会恢复状态。