2012-02-10 105 views
2

背景:响应异步SQL调用,因为他们完成

我有一个服务,分批查询历史结果分贝。 批次基于开始时间和结束时间。所有批次之间的数据是相互排斥的,因此对于给定的一批批次,它们可以按任意顺序运行。有些批次可能需要更多时间才能执行。如果有任何批次失败,则整个过程中止/停止并记录错误。将数据返回给客户端时,需要合并来自所有批次的数据。

问题:

尽快将数据返回给客户端。

初步解决方案:

起初我是为了同步执行批次。这没有利用数据互斥的事实。初次使用后,发现此加载方法耗时过长。经过一些调试后,我发现导致速度慢的主要原因是sql查询的执行时间。因此,下一个解决方案是尝试使用BeginExecuteReader()EndExecuteReader()异步执行每个批次。在异步调用所有批次后,该服务将在一个while循环中等待并每隔300ms轮询一次以检查是否有任何查询已完成。如果是,那么它会被读取。

int batchCount = 0, resultCount = 0; 
List<AsyncDataResult> asyncCalls = new List<AsyncDataResult>(); 

while (true) 
{ 
    if (asyncCalls.Count == 0) 
    { 
     break; 
    } 

    if ((asyncCalls[resultCount]).AsyncResult.IsCompleted) 
    { 
     ReadAsyncResultsFromDb(asyncCalls[resultCount]); 
     asyncCalls.RemoveAt(resultCount); 
    } 

    resultCount++; 
    if (resultCount >= asyncCalls.Count) 
    { 
     resultCount = 0; 
     Thread.Sleep(300); 
    } 
} 

上述方法降低了装载时间对于大数据集,但对于非常小的数据集(但在许多批次),轮询被实际添加到装载延迟。

问:

  1. 如何异步执行的SQL,但不会做投票?
  2. 开始读取每批完成后?

更新: 对不起,但我忘了添加我需要返回的部分在调用异步调用相同的方法。原因是我需要填充的数据集作为参数传入此方法。从开始阅读器使用IAsyncCallback将需要我改变整个班级。我希望我不会那样做。

+0

哪.net框架4? – 2012-02-10 21:19:07

+0

是的。它的.net 4.0 – EndlessSpace 2012-02-10 21:23:51

回答

1

没有足够的信息来提出一个方法,你应该去,但defintely一个你可能,这将是任务并行库和Task<T>

荷载的乐趣。但是,不要因此而生气,你可能很容易就会因为你的超级多线程工作比同步批处理慢而结束。

如果说T,连接到一个数据库,抛出一个查询,返回一个datareader,并说批量中有8个查询。

您设置了任务0到7,启动计时器超时,将其全部关闭。 完成后,您可以让读者保留并根据任务ID设置您的标志位。当它达到255时产生一个OnBatchComplete事件,复制你的读者并将它们传递给组合任务。超时首先熄灭,并采取相应措施。如果任务出错,是否返回一些合适的原因,向调用者冒泡,可能会导致任何仍在运行的查询。

不知道你的组合过程是如何工作的,但是如果它可以这样组织,所以说一旦查询完成,你可以做一个中间过程,或者如果它是按照某种逻辑顺序的话,一旦所有的查询准备阅读,你可以开始阅读到简单的类,然后抛出每一个在合并任务....

这是不公平我不明白的乐趣这样的东西....

2

为什么民调积极?您产生的每个异步操作都返回一个具有WaitHandle的IAsyncResult。使用WaitAny()并让系统通知您:

/// <summary> 
/// Do something useful with a completed query 
/// </summary> 
/// <param name="result"></param> 
/// <returns> 
/// true if the query failed; 
/// false if the query was successful 
/// </returns> 
private static bool DoSomethingUseful(IAsyncResult result) 
{ 
    throw new NotImplementedException() ; 
} 

static void Main(string[] args) 
{ 
    List<IAsyncResult> batch   = SpawnBatch() ; 
    bool    errorOccurred = ProcessCompletedBatches(batch , DoSomethingUseful) ; 

    if (errorOccurred) 
    { 
     CancelPending(batch) ; 
    } 

    return ; 

} 

public static bool ProcessCompletedBatches(List<IAsyncResult> pending , Func<IAsyncResult,bool> resultHandler) 
{ 
    bool errorOccurred = false ; 

    while (! errorOccurred && pending.Count > 0) 
    { 
     WaitHandle[] inFlight = pending.Select(x => x.AsyncWaitHandle).ToArray() ; 

     int offset = WaitHandle.WaitAny(inFlight) ; 

     IAsyncResult result = pending[offset] ; 
     pending.RemoveAt(offset) ; 

     errorOccurred = resultHandler(result) ; 

    } 

    return errorOccurred ; 

}