2010-04-06 80 views
1

我面临两难多线程情况下使用SqlBulkCopy的(!)。与线程池的问题

在第一场景中,我实现从一个数据的基础上复制数据的解决方案到另一个使用SqlBulkCopy的同步和我没有问题的。

现在,使用ThreadPool,我实现了一个assynchronously情况下,每个表中的线程一样的,一切工作正常,但过去一段时间(usualy1小时因为复制操作需要大约同一时间),操作发送到ThreadPool停止执行。有一个diferent SQLBulkCopy使用每线程一个diferent SQLConnection

我已经看到的空闲线程数,他们是在调用开始全部免费。我有一个AutoResetEvent等待线程在再次启动之前完成其工作,还有一个信号量FIFO用于保存活动线程的计数器。

有没有办法,我已经忘记了,或者使用SqlBulkCopy的时候,我应该avaliate一些问题?我很欣赏一些帮助,因为我的想法了;)


- >使用

SemaphoreFIFO waitingThreads = new SemaphoreFIFO(); 
AutoResetEvent autoResetEvent = new AutoResetEvent(false); 
(...) 
List<TableMappingHolder> list = BulkCopy.Mapping(tables); 
waitingThreads.Put(list.Count, 300000); 

for (int i = 0; i < list.Count; i++){ 
    ThreadPool.QueueUserWorkItem(call => 
     //Replication 
     (...) 
     waitingThreads.Get(); 

     if (waitingThreads.Counter == 0) 
      autoResetEvent.Set(); 
    ); 
} 

bool finalized = finalized = autoResetEvent.WaitOne(300000); 
(...) 

//批量复制

public bool SetData(SqlDataReader reader, string _destinationTableName, List<SqlBulkCopyColumnMapping> _sqlBulkCopyColumnMappings) 
     { 
      using (SqlConnection destinationConnection = 
          new SqlConnection(ConfigurationManager.ConnectionStrings["dconn"].ToString())) 
      { 
       destinationConnection.Open(); 

       // Set up the bulk copy object. 
       // Note that the column positions in the source 
       // data reader match the column positions in 
       // the destination table so there is no need to 
       // map columns. 
       using (SqlBulkCopy bulkCopy = 
          new SqlBulkCopy(destinationConnection))     { 
        bulkCopy.BulkCopyTimeout = 300000; 
        bulkCopy.DestinationTableName = _destinationTableName; 

        // Set up the column mappings by name. 
        foreach (SqlBulkCopyColumnMapping columnMapping in _sqlBulkCopyColumnMappings) 
         bulkCopy.ColumnMappings.Add(columnMapping); 

        try{ 
         // Write from the source to the destination. 
         bulkCopy.WriteToServer(reader); 
        } 
        catch (Exception ex){return false;} 
        finally 
        { 
         try{reader.Close();} 
         catch (Exception e){//log} 
         try{bulkCopy.Close();} 
         catch (Exception e){//log} 
         try{destinationConnection.Close(); } 
         catch (Exception e){ //log } 
        } 
       } 
      } 
      return true; 
     } 

信号灯

public sealed class SemaphoreFIFO 
{ 
    private int _counter; 
    private readonly LinkedList<int> waitQueue = new LinkedList<int>(); 

    public int Counter 
    { 
     get { return _counter; } 
    } 

    private void internalNotify() 
    { 
     if (waitQueue.Count > 0 && _counter == 0) 
     { 
      Monitor.PulseAll(waitQueue); 
     } 
    } 

    public void Get() 
    { 
     lock (waitQueue) 
     { 
      _counter --; 
      internalNotify(); 
     } 
    } 

    public bool Put(int n, int timeout) 
    { 
     if (timeout < 0 && timeout != Timeout.Infinite) 
      throw new ArgumentOutOfRangeException("timeout"); 
     if (n < 0) 
      throw new ArgumentOutOfRangeException("n"); 

     lock (waitQueue) 
     { 
      if (waitQueue.Count == 0 && _counter ==0) 
      { 
       _counter +=n; 
       internalNotify(); 
       return true; 
      } 

      int endTime = Environment.TickCount + timeout; 
      LinkedListNode<int> me = waitQueue.AddLast(n); 
      try 
      { 
       while (true) 
       { 
        Monitor.Wait(waitQueue, timeout); 

        if (waitQueue.First == me && _counter ==0) 
        { 
         _counter += n; 
         waitQueue.RemoveFirst(); 
         internalNotify(); 
         return true; 
        } 

        if (timeout != Timeout.Infinite) 
        { 
         int remainingTime = endTime - Environment.TickCount; 
         if (remainingTime <= 0) 
         { 
          // TIMEOUT 
          if (waitQueue.First == me) 
          { 
           waitQueue.RemoveFirst(); 
           internalNotify(); 
          } 
          else 
           waitQueue.Remove(me); 
          return false; 
         } 
         timeout = remainingTime; 
        } 
       } 
      } 
      catch (ThreadInterruptedException e) 
      { 
       // INTERRUPT 
       if (waitQueue.First == me) 
       { 
        waitQueue.RemoveFirst(); 
        internalNotify(); 
       } 
       else 
        waitQueue.Remove(me); 
       throw e; 
      } 
     } 
    } 
} 

回答

0

我只想回到同步使用SQLBulkCopy。我不确定你在同一时间做了一堆批量拷贝(而不是一个接一个地获得)。它可以完成一切更快一点,但我甚至不确定这一点。

+0

在我的生产环境中它是同步的,但我想要的是所有表中的数据几乎在同一时间写入,因为它将被另一个应用程序使用 – Soulbe 2010-04-06 17:52:50

+1

我的猜测是你不会获得太多(如果有的话)从这种方法。我认为如果你同时运行两个批量拷贝,每个拷贝的运行速度大约是单独运行的一半,所以净完成时间将是相同的。我从来没有听说过这个正在做的(多个同时批量插入),并且它可能是你的线程停止执行,因为你正在运行到一个基本的限制如何可以批量插入可以在同一时间内完成。 – MusiGenesis 2010-04-06 18:51:34

+0

感谢您的评论。事实上,如果我评论批量插入并让其他数据访问,应用程序工作正常! – Soulbe 2010-04-07 08:47:04