2012-01-13 97 views
21

我正在编写一个C#程序,通过FTP生成并上传50万个文件。我想并行处理4个文件,因为机器有4个内核,生成文件需要更长的时间。是否有可能将以下Powershell示例转换为C#?或者有没有更好的框架,如C#中的Actor框架(如F#MailboxProcessor)?限制C#中并行线程的数量

Powershell example

$maxConcurrentJobs = 3; 

# Read the input and queue it up 
$jobInput = get-content .\input.txt 
$queue = [System.Collections.Queue]::Synchronized((New-Object System.Collections.Queue)) 
foreach($item in $jobInput) 
{ 
    $queue.Enqueue($item) 
} 

# Function that pops input off the queue and starts a job with it 
function RunJobFromQueue 
{ 
    if($queue.Count -gt 0) 
    { 
     $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() 
     Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null 
    } 
} 

# Start up to the max number of concurrent jobs 
# Each job will take care of running the rest 
for($i = 0; $i -lt $maxConcurrentJobs; $i++) 
{ 
    RunJobFromQueue 
} 

更新:
到远程FTP服务器的连接可能会很慢,所以我想限制FTP上传处理。

+0

如果要限制并行任务的数量,为什么不使用TPL? – 2012-01-13 16:34:05

+1

线程池应该足够智能以便为您处理此问题。为什么要自己管理它? – 2012-01-13 16:36:37

+3

您可以使用[PLINQ](http://msdn.microsoft.com/en-us/library/dd460688.aspx)并设置[WithDegreeOfParallelism](http://msdn.microsoft.com/en-us/library/ dd383719.aspx)。 – 2012-01-13 16:39:17

回答

5

如果您使用的是.NET 4.0中,您可以使用Parallel library

假如你使用的是Parallel Foreach for instance或者你可以have a look to PLinq 这里一个comparison between the two

迭代通过量的一半百万个文件,你可以“并行”的迭代
+0

请证明-1。 – 2012-01-13 16:41:46

+0

这个问题被标记为C#-4.0,很明显,他是扩展名和使用.NET 4的一员。单个句子不回答他的问题。 – 2012-01-13 16:44:58

+0

很明显他使用的是C#4.0,但他并不清楚他熟悉Parallel库,因此他不会问一个问题。另外,我的回复也包含或多或少与另一个相同的信息。请请证明-1。 – 2012-01-13 16:51:08

16

任务并行库是你的朋友在这里。请参阅this链接,其中介绍了您可以使用的内容。基本上框架4,用它可优化这些基本后台线程池线程到正在运行的机器上的处理器的数量。

也许沿着线的东西:

ParallelOptions options = new ParallelOptions(); 

options.MaxDegreeOfParallelism = 4; 

然后在你的循环是这样的:

Parallel.Invoke(options, 
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"), 
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 
2

基本上你会想为每个文件创建一个行动或任务上传,将它们放入列表中,然后处理该列表,限制可并行处理的数量。

My blog post展示了如何都与任务,并以行动做到这一点,并提供您可以下载并运行同时看到在行动的示例项目。

随着使用操作操作

如果,您可以使用内置的.Net Parallel.Invoke功能。这里我们限制它并行运行最多4个线程。

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => UploadFile(localFile))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

虽然此选项不支持异步,但我假设您是FileUpload函数,因此您可能需要使用下面的Task示例。

带任务

使用任务没有内置功能。但是,您可以使用我在我的博客上提供的那个。

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

,然后创建您的任务列表,并调用函数让他们跑,有说在同一时间最多4个同时的,你可以这样做:

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await UploadFile(localFile))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

此外,由于这方法支持异步,它不会像使用Parallel.Invoke或Parallel.ForEach那样阻塞UI线程。

0

我编写了下面的技术,我使用BlockingCollection作为线程管理器。实施和处理这项工作非常简单。 它只是接受任务对象并向阻塞列表中添加一个整数值,将运行线程数增加1.当线程完成时,它将使对象出队并释放块以用于即将到来的任务的添加操作。

 public class BlockingTaskQueue 
     { 
      private BlockingCollection<int> threadManager { get; set; } = null; 
      public bool IsWorking 
      { 
       get 
       { 
        return threadManager.Count > 0 ? true : false; 
       } 
      } 

      public BlockingTaskQueue(int maxThread) 
      { 
       threadManager = new BlockingCollection<int>(maxThread); 
      } 

      public async Task AddTask(Task task) 
      { 
       Task.Run(() => 
       { 
        Run(task); 
       }); 
      } 

      private bool Run(Task task) 
      { 
       try 
       { 
        threadManager.Add(1); 
        task.Start(); 
        task.Wait(); 
        return true; 

       } 
       catch (Exception ex) 
       { 
        return false; 
       } 
       finally 
       { 
        threadManager.Take(); 
       } 

      } 

     }