2015-01-26 116 views
0

我定期进入我认为是parallel.ForEach循环内的竞态条件。我这样说是因为它总是挂在代码的那一部分上。Parallel.ForEach竞赛条件

try 
{ 
    Parallel.ForEach(Directory.EnumerateFiles(directory, "*.tracex", SearchOption.TopDirectoryOnly), _po, (path, ls) => 
     { 
      DebugFile file; 
      if (filterDate) 
      { 
       if (filterUser) 
       { 
        file = new DebugFile(path, startTime, endTime, user); 
       } 
       else file = new DebugFile(path, startTime, endTime); 
      } 
      else if (filterUser) 
      { 
       file = new DebugFile(path, user); 
      } 
      else file = new DebugFile(path); 
      if (!file.IsFiltered()) 
      { 
       _files.Add(file); 
      } 
      Interlocked.Increment(ref _loadCount); // increment how many we've checked 
      if (_po.CancellationToken.IsCancellationRequested) 
      { 
       ls.Break(); 
      } 
     }); 
} 
catch (OperationCanceledException oce) 
{ 
    Debug.WriteLine(oce.ToString()); 
} 

在我的_files对象中,我调用Add方法时处理锁定。

public virtual void Add(T item) 
{ 
    _lock.EnterWriteLock(); 
    try 
    { 
     _bindingList.Add(item); 
    } 
    finally 
    { 
     _lock.ExitWriteLock(); 
    } 
    OnListChanged(new ListChangedEventArgs(ListChangedType.ItemAdded, _bindingList.Count - 1)); 
} 

任何想法我在做什么错在这里?它并不是每一次,只是间歇性的。另外,至少对我而言,第一次调用代码时不会发生。它只会发生,如果我打电话一次,然后再次调用它,通常是第二次或第三次。

谢谢!

UPDATE 我意识到我正在使用自定义任务调度程序。当我删除它时,我不再看到挂起。我这样做,所以我可以定制我正在运行多少个线程。我的想法是,因为我主要通过网络读取文件,IO会减慢速度,所以我可以一次运行更多的任务。以下是我如何构建调度程序:

public class TaskSchedulerForSlowIO : TaskScheduler 
{ 
    /// <summary> 
    /// maximum number of tasks to run concurrently 
    /// </summary> 
    private int _maxConcurrencyLevel; 

    /// <summary> 
    /// lock for reading tasks array 
    /// </summary> 
    private ReaderWriterLockSlim _listLock = new ReaderWriterLockSlim(); 

    /// <summary> 
    /// list of tasks running 
    /// </summary> 
    private LinkedList<Task> _tasks = new LinkedList<Task>(); 

    /// <summary> 
    /// Default constructor - This will increase threadpool limits if necessary 
    /// </summary> 
    public TaskSchedulerForSlowIO() 
     : base() 
    { 
     _maxConcurrencyLevel = Environment.ProcessorCount * 10; 
     int workerThreads, ioThreads, minimumConcurrency; 
     minimumConcurrency = Environment.ProcessorCount * 2; 
     ThreadPool.GetMaxThreads(out workerThreads, out ioThreads); 
     if (workerThreads < _maxConcurrencyLevel) 
     { 
      if (ioThreads < _maxConcurrencyLevel) 
      { 
       ioThreads = _maxConcurrencyLevel; 
      } 
      ThreadPool.SetMaxThreads(_maxConcurrencyLevel, ioThreads); 
     } 
     ThreadPool.GetMinThreads(out workerThreads, out ioThreads); 
     if (workerThreads < minimumConcurrency) 
     { 
      if (ioThreads < minimumConcurrency) 
      { 
       ioThreads = minimumConcurrency; 
      } 
      ThreadPool.SetMinThreads(minimumConcurrency, ioThreads); 
     } 
    } 

    /// <summary> 
    /// Implementing TaskScheduler 
    /// </summary> 
    public override int MaximumConcurrencyLevel 
    { 
     get 
     { 
      return _maxConcurrencyLevel; 
     } 
    } 

    /// <summary> 
    /// Scheduler Implementation 
    /// </summary> 
    /// <returns>ScheduledTasks</returns> 
    protected override IEnumerable<Task> GetScheduledTasks() 
    { 
     Task[] tasks; 
     _listLock.EnterReadLock(); 
     try 
     { 
      tasks = _tasks.ToArray(); 
     } 
     finally 
     { 
      _listLock.ExitReadLock(); 
     } 
     return tasks; 
    } 

    /// <summary> 
    /// Queues the specified task 
    /// </summary> 
    /// <param name="task">Task to queue</param> 
    protected override void QueueTask(Task task) 
    { 
     int count; 
     _listLock.EnterReadLock(); 
     try 
     { 
      _tasks.AddLast(task); 
      count = _tasks.Count; 
     } 
     finally 
     { 
      _listLock.ExitReadLock(); 
     } 
     if (count <= _maxConcurrencyLevel) 
     { 
      ThreadPool.UnsafeQueueUserWorkItem(ProcessTask, task); 
     } 
    } 

    /// <summary> 
    /// Scheduler Implementation 
    /// </summary> 
    /// <param name="task">Task to remove</param> 
    /// <returns>Success</returns> 
    protected override bool TryDequeue(Task task) 
    { 
     _listLock.EnterWriteLock(); 
     try 
     { 
      return _tasks.Remove(task); 
     } 
     finally 
     { 
      _listLock.ExitWriteLock(); 
     } 
    } 

    /// <summary> 
    /// Scheduled Implementation 
    /// </summary> 
    /// <param name="task">Task to execute</param> 
    /// <param name="taskWasPreviouslyQueued">Was the task previously queued</param> 
    /// <returns></returns> 
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 
    { 
     //We're not going to inline slow IO 
     return false; 
    } 

    void ProcessTask(object o) 
    { 
     try 
     { 
      Task t = o as Task; 
      if (t != null) 
      { 
       if (base.TryExecuteTask(t)) 
       { 
        if(!(t.IsCanceled || t.IsFaulted)) t.Wait(); 
        TryDequeue(t); 
       } 
      } 
     } 
     catch(AggregateException a) 
     { 
      var e = a.Flatten(); 
      foreach (Exception ex in e.InnerExceptions) 
      { 
       Debug.WriteLine(ex.ToString()); 
      } 
     } 
    } 
} 
+0

我会先在'Add'方法中删除'try' /'finally'。捕获所有异常处理使调试非常困难。这样做,看看是否有任何东西被抛出。 – Enigmativity 2015-01-26 22:05:42

+0

@Enigmativity'try/finally'并不意味着* Catch-所有的异常*只是为了确保'_lock.ExitWriteLock();'被执行。 – EZI 2015-01-26 22:10:06

+0

griztown,你用'lock(aSharedObj)_bindingList.Add(item);'替换你的方法时会得到什么? – EZI 2015-01-26 22:12:43

回答

-1

可能有很多原因。 例如

1)的原点,因此突变特征filterUserfilterDateIsFiltered() ...不明确看上的代码,这可能引起问题。

2)通常,代码是不可扩展的。避免并行访问(读取)文件,因为IO设备(我认为在您的情况下是硬盘)不是并行读取设备,并且在简单串行处理的情况下,很可能会导致性能变差。推荐:将线程关联设置为只有2个线程/内核(我再次假设你有更多),并进行调试,以查看会发生什么。很有可能你会遇到冲突加剧的地步。

+0

@downvoter:原因? – Tigran 2015-01-26 22:14:21

+0

你的文章不是问题的答案。建议:不要试图回答无法回答的问题。 – 2015-01-27 02:14:11

+0

感谢Tigran.filterUser和filterDate是布尔值,它告诉我们是否要根据关联用户或创建日期/时间来过滤文件。 IsFiltered是DebugFile类中的一个标志,如果文件应该被过滤,该标志将被设置。我会测试其他建议。 – griztown 2015-01-27 03:41:07