2014-09-05 81 views
0

使用案例是:我有一个巨大的日志文件,我正在通过块(等大小,IO读取)在主线程块上读取。每个读取的块大概在我的测试机器中需要1秒。读完每个块后,我使用一个线程池为每个块创建一个线程,将其放入2个数据库实例中。现在我有2个挑战:多线程完成时间测量

  1. 我必须交替地插入2个DBS块。即奇数块去第一个DB,甚至大块去第二个DB。我在块模型中没有任何东西来表示我可以依赖的块的数量。我试图在该块模型上创建一个包装器,使其具有“chunkCount”,但我在哪里增加chunkCount?

  2. 我该如何测量每个将从线程池运行在不同线程上的插入的时间?

下面的代码我试图实验的基础上,但它不会产生任何结果:

logEventsChunk = logFetcher.GetNextLogEventsChunk(); 
      chunkModel = new LogEventChunkModel(); 
      stw = new Stopwatch(); 
      chunkModel.ChunkCount = chunkCount; 
      chunkModel.LogeventChunk = logEventsChunk; 


      //chunkCount++; 
      ThreadPool.QueueUserWorkItem(new WaitCallback(delegate(object state) 
      { InsertChunk(chunkModel, collection, secondCollection, stw); }), null); 

的InsertChunk方法是在这里:

private void InsertChunk(LogEventChunkModel logEventsChunk, MongoCollection<LogEvent> collection, MongoCollection<LogEvent> secondCollection,Stopwatch stw) 
    { 
     chunkCount++; 
     stw.Start(); 
     MongoInsertOptions options = new MongoInsertOptions(); 
     options.WriteConcern = WriteConcern.Unacknowledged; 
     options.CheckElementNames = true; 
     string db = string.Empty; 
     { 
      //DateTime dtWrite = DateTime.Now; 
      if (logEventsChunk.ChunkCount % 2 == 0) 
      { 
       DateTime dtWrite1 = DateTime.Now; 
       collection.InsertBatch(logEventsChunk.LogeventChunk.LogEvents, options); 
       db = "FirstDB"; 
       //Console.WriteLine("Time taken to write the chunk: " + DateTime.Now.Subtract(dtWrite1).TotalSeconds.ToString() + " s. " + db); 
      } 
      else 
      { 
       DateTime dtWrite2 = DateTime.Now; 
       secondCollection.InsertBatch(logEventsChunk.LogeventChunk.LogEvents, options); 
       db = "SecondDB"; 
       //Console.WriteLine("Time taken to write the chunk: " + DateTime.Now.Subtract(dtWrite2).TotalSeconds.ToString() + " s. " + db); 
      } 
      Console.WriteLine("Thread Completed: {0} **********", Thread.CurrentThread.GetHashCode()); 
      stw.Stop(); 
      Console.WriteLine("Time taken to write the chunk: " + stw.ElapsedMilliseconds + " ms. " + db + " Chunk Count: " + logEventsChunk.ChunkCount); 
      stw.Reset(); 

      //+ "Chunk Count: " + chunkCount.ToString() 
      //Console.WriteLine("Time taken to write the chunk: " + DateTime.Now.Subtract(dtWrite).TotalSeconds.ToString() + " s. "+db); 
      //mongoDBInsertionTotalTime += DateTime.Now.Subtract(dtWrite).TotalSeconds; 
     }    
    } 

请忽略这些注释行,因为它们是只是一些实验的一部分。

回答

1

与其为每个插入启动一个新线程,并试图让线程确定要写入哪个数据库,启动两个持久线程,每个持久线程写入单个数据库。那些线程从队列中获取数据。这是一个使用BlockingCollection<T>的非常标准的生产者/消费者设置。

所以,你必须:

// Maximum number of items in queue (to avoid out of memory errors) 
const int MaxQueueSize = 10000; 
BlockingCollection<LogEventChunkModel> Db1Queue = new BlockingCollection<LogEventChunkModel>(MaxQueueSize); 
BlockingCollection<LogEventChunkModel> Db2Queue = new BlockingCollection<LogEventChunkModel>(MaxQueueSize); 

在你的主线程,启动数据库更新线程:

var t1 = new Thread(DbWriteThreadProc); 
t1.Start(new Tuple<string, BlockingCollection<LogEventChunkModel>>("FirstDB", Db1Queue)); 

var t2 = new Thread(DbWriteThreadProc); 
t2.Start(new Tuple<string, BlockingCollection<LogEventChunkModel>>("SecondDb", Db2Queue)); 

然后,开始读取日志文件,并把备用块入队列:

int chunk = 0; 
while (!EndOfLogFile) 
{ 
    var chunk = GetNextChunk(); 
    if ((chunk % 0) == 0) 
     Db1Queue.Add(chunk); 
    else 
     Db2Queue.Add(chunk); 
    ++chunk; 
} 

// end of data, so mark the queues as complete 
Db1Queue.CompleteAdding(); 
Db2Queue.CompleteAdding(); 

// and wait for threads to complete processing the queues 
t1.Join(); 
t2.Join(); 

你写的线程过程非常简单。它所做的就是提供服务的队列,并写入数据库:

void DbWriteThreadProc(object state) 
{ 
    // passed object is a Tuple<string, BlockingCollection> 
    // Get the items from it 
    var threadData = (Tuple<string, BlockingCollection>)state; 
    string dbName = threadData.Item1; 
    BlockingCollection<LogEventChunk> queue = threadData.Item2; 

    // now read the queue and write to the database 
    foreach (var chunk in queue.GetConsumingEnumerable()) 
    { 
     var sw = Stopwatch.StartNew(); 
     // write chunk to the database. 
     sw.Stop(); 
     Console.WriteLine("Time to write = {0:N0} ms", sw.ElapsedMilliseconds); 
    } 
} 

GetConsumingEnumerable确实非忙等待队列,所以它不是不断地轮询。当队列为空时,循环将完成队列被标记为完成添加(这就是主线程调用CompleteAdding的原因)。

这种方法比你有什么好处。特别是,它简化了确定哪些数据库块被写入。另外,它最多使用三个线程,并保证按照与从日志文件中读取的顺序相同的顺序将数据块添加到数据库中。您使用QueueUserWorkItem的方法不能保证广告订单。它还会为每个插入创建一个新线程,最终可能会有大量的并发线程。