3

末我有这个问题,当我跑是这样的:Parallel.ForEach行为像一个普通的每个朝向的迭代

Parallel.ForEach(dataTable.AsEnumerable(), row => 
{ 
    //do processing 
} 

假设有500+的记录说,870一旦并行.ForEach完成850,它似乎是顺序运行,即一次只有1次操作。它以非常快的速度完成了850次操作,但当它接近迭代结束时,它变得非常缓慢,似乎每个操作都像正常一样。我甚至尝试过2000条记录。

我的代码有问题吗?请提出建议。

下面是我使用

对不起,我只是贴错示例代码。这是正确的代码:

Task newTask = Task.Factory.StartNew(() => 
{ 
    Parallel.ForEach(dtResult.AsEnumerable(), dr => 
    { 
     string extractQuery = ""; 
     string downLoadFileFullName = ""; 
     lock (foreachObject) 
     { 

      string fileName = extractorConfig.EncodeFileName(dr); 
      extractQuery = extractorConfig.GetExtractQuery(dr); 
      if (string.IsNullOrEmpty(extractQuery)) throw new Exception("Extract Query not found. Please check the configuration"); 

      string newDownLoadPath = CommonUtil.GetFormalizedDataPath(sDownLoadPath, uKey.CobDate); 
      //create folder if it doesn't exist 
      if (!Directory.Exists(newDownLoadPath)) Directory.CreateDirectory(newDownLoadPath); 
      downLoadFileFullName = Path.Combine(newDownLoadPath, fileName); 
     } 
     Interlocked.Increment(ref index); 

     ExtractorClass util = new ExtractorClass(SourceDbConnStr); 
     util.LoadToFile(extractQuery, downLoadFileFullName); 
     Interlocked.Increment(ref uiTimerIndex); 
    }); 
}); 
+3

请提供完整的代码块 – 2011-02-02 19:30:06

+2

我不知道这应该是一个评论,或一个答案,但我觉得它需要指出:`DataTable`不是线程安全的类型。因此,如果你的'//处理'代码涉及任何类型的修改(甚至包括单个行内的单元格),恐怕你正在寻求一个痛苦的世界。 – 2011-02-02 19:31:52

+0

对于dataTable中的每一行,都会调用数据库获取数据并将其加载到文件中。它像一个提取过程。从数据库提取数据并提取到文件。 – 2011-02-02 19:36:31

回答

3

我的猜测:

这看起来具有高度潜力的IO:

  • 数据库+磁盘
  • 网络通信DB和背部
  • 结果写入到磁盘

因此,很多时间将用于等待IO。我的猜测是随着越来越多的线程被添加到混合中,并且IO进一步受到压力,等待变得越来越糟糕。例如一个磁盘只有一组磁头,所以你不能同时写入它。如果您有大量线程试图同时写入,则性能会降低。

尝试限制的最大线程数使用的是:

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 }; 

Parallel.ForEach(dtResult.AsEnumerable(), options, dr => 
{ 
    //Do stuff 
}); 

更新

你的代码编辑后,我会建议它有一些改动如下:

  • 减少线程的最大数量 - 这可以试验。
  • 只执行一次目录检查和创建。

代码:

private static bool isDirectoryCreated; 

//... 

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 }; 

Parallel.ForEach(dtResult.AsEnumerable(), options, dr => 
{ 
    string fileName, extractQuery, newDownLoadPath; 

    lock (foreachObject) 
    { 
     fileName = extractorConfig.EncodeFileName(dr); 

     extractQuery = extractorConfig.GetExtractQuery(dr); 

     if (string.IsNullOrEmpty(extractQuery)) 
      throw new Exception("Extract Query not found. Please check the configuration"); 

     newDownLoadPath = CommonUtil.GetFormalizedDataPath(sDownLoadPath, uKey.CobDate); 

     if (!isDirectoryCreated) 
     { 
      if (!Directory.Exists(newDownLoadPath)) 
       Directory.CreateDirectory(newDownLoadPath); 

      isDirectoryCreated = true; 
     } 
    } 

    string downLoadFileFullName = Path.Combine(newDownLoadPath, fileName); 

    Interlocked.Increment(ref index); 

    ExtractorClass util = new ExtractorClass(SourceDbConnStr); 
    util.LoadToFile(extractQuery, downLoadFileFullName); 

    Interlocked.Increment(ref uiTimerIndex); 
}); 
2

很难给出没有相关代码的细节,但通常这是预期的行为。 .NET会尝试安排这样的任务,即每个处理器均匀繁忙。

但是这只能是近似的,并非所有的任务都需要相同的时间。最后,一些处理器将完成工作,一些处理器将不工作,并且重新分配工作是昂贵的并且不总是有益的。

我不知道PLinq使用的负载平衡的细节,但底线是这种行为永远不能完全防止。

1

假设你限制并行两个线程。有(至少)两种可能的方式,Parallel.ForEach可能有效。一种方法是启动两个线程,每个线程完成一半的项目。所以如果你有850个物品,实际上会发生的情况是,线索1被给予前425个项目,线索2被给予425个项目的第二个块。现在这两个线程都起作用了。处理的项目顺序将如下所示:[0,425,426,1,2,427,3,428,429,4,...]。

它很可能(有可能的,实际上)那一个线程将完成其项目组的速度远远超过了其他的呢。

它可以工作的另一种方法是启动两个线程,各有一个抢从列表中的项目,对它进行处理,然后拿到下一个项目,重复直到没有剩下的项目要处理。在这种情况下,处理项目的顺序更像[0,1,2,4,3,6,5,...]。

在第一示例中,每个线程被给予项要处理的块。在第二种情况下,每个线程处理来自公共块的项目,直到没有剩余项目。

有变化,但这些是两个主要的方法可以将多个线程之间划分工作。既可以给每一个项目分配一组项目,也可以期望每个线程在处理完一个项目后再请求下一个项目。

Parallel.ForEach在第一种方式实现:每个线程都有自己的项目组来处理的。以另一种方式进行操作会需要更多的开销,因为项目列表必须像共享队列一样处理,并产生同步开销。