2010-02-02 35 views
3

我需要一个机制来实现以下情形:C#多线程并发机制,以等待一个给定的结果

  1. 两个或多个线程需要在同一时间加载一组给定值的
  2. 只有一个请求必须按值完成,因此如果两个线程需要加载相同的子集,则必须等待另一个子集,因为我们不希望每个值都具有锁定(或互斥锁或其他基元),因为它们可以可能过高。

的情形是(假设线程B进入早一点)

  thread A   thread B 
values 5, 8, 9, 12  7, 8, 9, 13, 14 
request 5,  12  7, 8, 9, 13, 14 
waits for 8, 9 
          >>data loaded<< 
retrieves 8, 9 

      >> data loaded << 
returns 5, 8, 9, 12 

我应该使用哪种并发机制呢?

记住生产者/消费者不会工作,因为线程A和B不完全是消费者(他们只对某些数据感兴趣)。

谢谢

+0

是不可变的值吗? 加载一个值需要多长时间? – MaLio 2010-02-02 20:03:50

+0

你是说你有一个数据流,因此该流中的每个项目都需要由两个线程处理?延迟是一个大问题? – Keith 2010-02-02 22:01:27

+0

每个请求都会有自己的号码,大部分时间都会被缓存,然后不会被请求。请求很贵,这就是我们只需要执行一次的原因。 – pablo 2010-02-02 22:46:20

回答

1

这听起来很像锁管理器,所以为什么不建立一个?

class LockManager<TKey> 
{ 
    private Dictionary<TKey, List<EventWaitHandle>> locks = 
     new Dictionary<TKey, List<EventWaitHandle>>(); 
    private Object syncRoot = new Object(); 

    public void Lock(TKey key) 
    { 
     do 
     { 
      Monitor.Enter(syncRoot); 
      List<EventWaitHandle> waiters = null; 
      if (true == locks.TryGetValue(key, out waiters)) 
      { 
       // Key is locked, add ourself to waiting list 
       // Not that this code is not safe under OOM conditions 
       AutoResetEvent eventLockFree = new AutoResetEvent(false); 
       waiters.Add(eventLockFree); 
       Monitor.Exit(syncRoot); 
       // Now wait for a notification 
       eventLockFree.WaitOne(); 
      } 
      else 
      { 
       // event is free 
       waiters = new List<EventWaitHandle>(); 
       locks.Add(key, waiters); 
       Monitor.Exit(syncRoot); 
       // we're done 
       break; 
      } 
     } while (true); 

    } 

    public void Release(TKey key) 
    { 
     List<EventWaitHandle> waiters = null; 
     lock (syncRoot) 
     { 
      if (false == locks.TryGetValue(key, out waiters)) 
      { 
       Debug.Assert(false, "Releasing a bad lock!"); 
      } 
      locks.Remove(key); 
     } 
     // Notify ALL waiters. Unfair notifications 
     // are better than FIFO for reasons of lock convoys 
     foreach (EventWaitHandle waitHandle in waiters) 
     { 
      waitHandle.Set(); 
     } 
    } 
} 

您使用它之前,您必须锁定每个值:

class Program 
{ 
    class ThreadData 
    { 
     public LockManager<int> LockManager { get; set; } 
     public int[] Work { get; set; } 
     public AutoResetEvent Done { get; set; } 
    } 

    static void Main(string[] args) 
    { 
     int[] forA = new int[] {5, 8, 9, 12}; 
     int[] forB = new int[] {7, 8, 9, 13, 14 }; 

     LockManager<int> lockManager = new LockManager<int>(); 

     ThreadData tdA = new ThreadData 
     { 
      LockManager = lockManager, 
      Work = forA, 
      Done = new AutoResetEvent(false), 
     }; 
     ThreadData tdB = new ThreadData 
     { 
      LockManager = lockManager, 
      Work = forB, 
      Done = new AutoResetEvent(false), 
     }; 

     ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), tdA); 
     ThreadPool.QueueUserWorkItem(new WaitCallback(Worker), tdB); 

     WaitHandle.WaitAll(new WaitHandle[] { tdA.Done, tdB.Done }); 
    } 

    static void Worker(object args) 
    { 
     Debug.Assert(args is ThreadData); 
     ThreadData data = (ThreadData) args; 
     try 
     { 
      foreach (int key in data.Work) 
      { 
       data.LockManager.Lock(key); 
       Console.WriteLine("~{0}: {1}", 
        Thread.CurrentThread.ManagedThreadId, key); 
       // simulate the load the set for Key 
       Thread.Sleep(1000); 
      } 
      foreach (int key in data.Work) 
      { 
       // Now free they locked keys 
       data.LockManager.Release(key); 
      } 
     } 
     catch (Exception e) 
     { 
      Debug.Write(e); 
     } 
     finally 
     { 
      data.Done.Set(); 
     } 
    } 
} 

你要面对的最大问题将是死锁。将两个数组更改为{5,8,9,7}和{7,8,9,5},您将立即看到我的观点。

+0

除非我错过了创建锁列表的每一项,而这正是我想避免的,因为这会对系统造成过大的损失,对吧? – pablo 2010-02-02 23:22:21

+0

每个*服务员*锁定*项目的一个锁。服务员(=一个线程)一次只能等待一个项目,所以最糟糕的情况是线程的事件数量多得多(精确匹配意味着你死锁了,但是死锁是你需求中固有的,并且可以onyl通过适当地分发集合来避免)。 – 2010-02-02 23:42:46

+0

那么,为了避免死锁,政策将是:你去提供那些不在等待名单上的人,然后等待你失踪的人,这样就可以避免死锁,对吗? – pablo 2010-02-03 07:11:06