2012-02-06 94 views
1

我需要处理传入的xml文件(它们将由其他应用程序直接在特定文件夹中创建),并且我需要快速执行它。并行处理传入的xml文件

每天最多可以有200 000个文件,我目前的假设是使用.NET 4tpl

我现在的服务理念是:

在循环中我要检查文件夹中的新文件,如果我发现任何人,我就会把他们排队,其将由另一回路将采取处理来自队列的文件并为它们中的每一个创建新的任务(线程)。同时执行的任务数量应该可配置。 第一部分很简单,但在它们之间创建两个带有队列的主循环对我来说是新事物。

问题: 如何创建两个循环(一个用于检查文件夹和添加文件,另一个用于从队列中取文件并处理它们并行)并添加队列以在它们之间进行通信。

对于第一部分(文件夹检查)建议的解决方案是使用FileSystemWatcher。现在需要讨论第二部分(可能是一些任务计划程序)。

+1

问题是什么? – PHeiberg 2012-02-06 09:19:42

+0

如何创建两个循环(一个用于检查文件夹和添加文件,另一个用于从队列中取文件并将它们并行处理)并添加队列以在它们之间进行通信。 – 2012-02-06 11:01:17

+0

请编辑“问题”以实际包含问题 – PHeiberg 2012-02-06 11:41:07

回答

0

可能不需要循环,不确定并行是必要的。如果你想处理一批新文件,这将很有用。 FileSystemWatcher将显示新文件的文件夹,将为您提供将文件添加到队列的事件。

为添加到队列中的项目添加事件,以触发线程处理单个文件。

如果你敲了一个简单的类,文件,状态,检测时间等

你就会有一个检测线程加入队列,线程池来处理它们并成功从队列中删除它们。

你可能在.net中发现这个问题以前有用threasafe“名单” 4

Thread-safe List<T> property

特别是如果你要处理所有的新文件,因为X.

注意,如果你不要使用FileSystem观察器,只需从文件夹中获取文件,将Processed文件夹移动到文件夹以及Failed文件夹中,这将是一个好主意。读取200,00个文件名以检查是否已经处理它们将会从并行处理中移除任何好处。

即使你这样做,我也会推荐它。只要将它移回到“处理”(或在发生故障时进行编辑后)将触发它进行重新处理。另一个优点是,如果您正在处理数据库,并且所有内容都会变成乳头状态,并且最后一次备份是在X.您可以恢复,然后将所有已处理的文件移回“toprocess”文件夹。

您也可以使用已知输入执行测试运行并检查db的状态前后。

进一步评论。

Task使用的ThreadPool有一个ThreadPool限制,用于yor应用程序中的所有for或后台任务。

评论后。

如果要限制并发任务数...

入门十年,你可以很容易地在,完善调整和提升。

在你的班级管理从文件队列蹬掉的任务,像

private object _canRunLock; 
private int _maxTasks; 
private int _activeTasks; 

public MyTaskManager(int argMaxTasks) 
{ 
    _maxTasks = argMaxTasks; 
    _canRunLock = new object(); 
    _activeTasks = 0; 
} 


public bool CanRunTask(MyTask argTask) 
{ 
    get 
    { 
    lock(_canRunLock) 
    { 
     if (_activeTasks < _maxTasks) 
     { 
     ExecuteTask(argTask); 
     _activeTasks++; 
     return true; 
     } 
    } 
    return false; 
    } 
} 

public void TaskCompleted() 
{ 
    lock(_canRunLock) 
    { 
    if (_activeTasks > 0) 
    { 
     _activeTasks--; 
    } 
    else 
    { 
     throw new WTFException("Okay how did this happen?"); 
    } 
    } 
} 

简单,安全的(我认为)。你可以有其他的属性暂停或禁用以及检查。可能想要使上面的单身人士(:(),或至少要记住,如果你运行多个... ...

我可以给的最好的建议是开始简单,开放和解耦,然后必要时复杂化,很容易在这里过早地开始优化,一个好主意不要让所有等待的线程都说FileSystem或者后端,但是我怀疑处理器的数量是否会成为瓶颈,所以你的。为MaxTasks在空气有点拇指 一些不大不小的下限和上限之间的自我调整的可能,而不是一个固定的数字是好事

+0

FileSystemWatcher将非常​​有用,但问题的第二部分会更加棘手。如何控制文件的多线程处理?FileSystemWatcher会将文件添加到队列中,但是如何控制线程创建(为了不产生太多线程,可以说最多4个线程用于文件处理)? – 2012-02-06 11:13:45

+1

请注意,[FileSystemWatcher有一些限制](http://stackoverflow.com/q/239988/66849)涉及到它可以一次处理多少文件更改。您必须适当地设置缓冲区大小。 – PHeiberg 2012-02-06 11:38:32

+0

等一下。很多方法可以做到这一点。 – 2012-02-06 16:49:05

0

IMO你想要的东西就像cron工作。该算法的版本可以是:现在

for every job (called periodically via cron/scheduler) run 

    // 
    // your program 
    // 
    if job_is_running { 
     // Still busy... 
     // don't process anything and just return back 
     return 
    } 

    // Create your array 
    // 
    Array a = new Array() 
    for each file in folder { 
     a.append(file) 
    } 

    // Process each file 
    // 
    for each item in a { 
    process_item(item); 

    // Move it (or delete) 
    // 
    remove_from_input_folder(item) 
    } 

,你可以处理之前调用remove_from input(),以避免重复处理,如果系统崩溃。

我不得不为一家电话公司做这样的事情一段时间了,这是我们得到了:)

更新最舒适的解决方案:并行位

通过文件来构建循环与实际处理相比,阵列在理论上可以忽略不计。因此,您可以轻松地将第二个循环转换为基于工作人员的并行变体。

HTH

3

听起来像是你的拼图缺少的部分是BlockingCollection

FileSystemWatcher watcher; 
BlockingCollection<string> bc; 
private readonly object _lock = new object(); 
Task[] tasks; 

void PrepareWatcher() 
{ 
    watcher = new FileSystemWatcher(@"c:"); 
    watcher.Created += (s,e) => 
    { 
     lock(_lock) //Prevents race condition when stopping 
     { 
      if (!bc.IsAddingCompleted) 
       bc.Add(e.FullPath); 
     } 
    }; 
} 

void StartProcessing(int taskCount) 
{ 
    tasks = new Task[taskCount]; 
    bc = new BlockingCollection<string>(); 

    for (int i = 0; i< taskCount; i++) 
     tasks[i] = (Task.Factory.StartNew(() => 
     { 
      foreach (var x in bc.GetConsumingEnumerable()) 
       ProcessXml(x); 
     }, TaskCreationOptions.LongRunning)); 

    watcher.EnableRaisingEvents = true; 
} 

void ProcessXml(string path) 
{ 
    //Do your processing here... 
    //Note many events will be called multiple times, see: 
    //http://weblogs.asp.net/ashben/archive/2003/10/14/31773.aspx 
} 

void StopProcessing() 
{ 
    watcher.EnableRaisingEvents = false; 

    lock (_lock) //The above line doesn't guarantee no more events will be called, 
       //And Add() and CompleteAdding() can't be called concurrently 
     bc.CompleteAdding(); 

    Task.WaitAll(tasks); 
    foreach (var task in tasks) 
     task.Dispose(); 
    bc.Dispose(); 
    tasks = null; 
} 
2

我很惊讶没有人问过,但考虑你想要实现的是两种应用程序之间的某种消息传递,你有没有考虑过使用WCF?