2010-04-09 65 views
0

我需要从外部系统检索多个对象。外部系统支持多个同时发生的请求(即线程),但是可能使外部系统泛滥 - 因此我希望能够异步地检索多个对象,但我希望能够限制同时发生的异步请求数量。即我需要检索100个项目,但不想一次检索超过25个项目的。当25的每个请求完成时,我想触发另一个检索,并且一旦它们全部完成,我想按照它们被请求的顺序返回所有结果(即,在返回整个调用之前没有结果返回结果)。这种事情有推荐的模式吗?用于限制同时异步呼叫数的模式

会是这样的事情是合适的(伪代码,显然)?

private List<externalSystemObjects> returnedObjects = new List<externalSystemObjects>; 

    public List<externalSystemObjects> GetObjects(List<string> ids) 
    { 
     int callCount = 0; 
     int maxCallCount = 25; 
     WaitHandle[] handles; 

     foreach(id in itemIds to get) 
     { 
      if(callCount < maxCallCount) 
      { 
       WaitHandle handle = executeCall(id, callback); 
       addWaitHandleToWaitArray(handle) 
      } 
     else 
     { 
      int returnedCallId = WaitHandle.WaitAny(handles); 
      removeReturnedCallFromWaitHandles(handles); 
     } 
    } 

    WaitHandle.WaitAll(handles); 

    return returnedObjects; 
    } 

    public void callback(object result) 
    { 
     returnedObjects.Add(result); 
    } 

回答

1

考虑项目的列表作为队列从25个线程处理出队的任务,处理任务来处理,增加的结果,然后重复,直到队列为空:

class Program 
    { 
    class State 
    { 
     public EventWaitHandle Done; 
     public int runningThreads; 
     public List<string> itemsToProcess; 
     public List<string> itemsResponses; 
    } 

    static void Main(string[] args) 
    { 
     State state = new State(); 

     state.itemsResponses = new List<string>(1000); 
     state.itemsToProcess = new List<string>(1000); 
     for (int i = 0; i < 1000; ++i) 
     { 
     state.itemsToProcess.Add(String.Format("Request {0}", i)); 
     } 

     state.runningThreads = 25; 
     state.Done = new AutoResetEvent(false); 

     for (int i = 0; i < 25; ++i) 
     { 
     Thread t =new Thread(new ParameterizedThreadStart(Processing)); 
     t.Start(state); 
     } 

     state.Done.WaitOne(); 

     foreach (string s in state.itemsResponses) 
     { 
     Console.WriteLine("{0}", s); 
     } 
    } 

    private static void Processing(object param) 
    { 
     Debug.Assert(param is State); 
     State state = param as State; 

     try 
     { 
     do 
     { 
      string item = null; 
      lock (state.itemsToProcess) 
      { 
      if (state.itemsToProcess.Count > 0) 
      { 
       item = state.itemsToProcess[0]; 
       state.itemsToProcess.RemoveAt(0); 
      } 
      } 
      if (null == item) 
      { 
      break; 
      } 
      // Simulate some processing 
      Thread.Sleep(10); 
      string response = String.Format("Response for {0} on thread: {1}", item, Thread.CurrentThread.ManagedThreadId); 
      lock (state.itemsResponses) 
      { 
      state.itemsResponses.Add(response); 
      } 
     } while (true); 

     } 
     catch (Exception) 
     { 
     // ... 
     } 
     finally 
     { 
     int threadsLeft = Interlocked.Decrement(ref state.runningThreads); 
     if (0 == threadsLeft) 
     { 
      state.Done.Set(); 
     } 
     } 
    } 
    } 

你可以做同样使用异步回调,不需要使用线程。

0

拥有一些类似队列的结构来保存挂起的请求是一种非常常见的模式。在可能有几个处理层的Web应用程序中,您会看到一种“漏斗”风格方法,处理变化的早期部分会有更大的队列。队列中可能还会有某种优先级,优先级较高的请求将被排序到队列顶部。

您的解决方案中需要考虑的一件重要事情是,如果请求到达率高于您的处理速率(这可能是由于拒绝服务攻击,或者只是某些部分处理过程今天异常缓慢),那么你的队列将不受限制地增加。您需要制定一些策略,例如在队列深度超过某个值时立即拒绝新请求。