2011-04-14 32 views
1

我有一个要求,使用SqlBulkCopy将大型csv文件分成几个不同的数据库插入。我打算通过两个单独的任务来完成此任务,1个用于批量处理CSV文件,另一个用于插入数据库。作为一个例子,这里是我的事情:具有容错功能的并行生产者/消费者?

public class UberTask 
{ 
    private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>(); 

    public void PerformTask() 
    { 
     var notifier = new UINotifier(); 
     Task.Factory.StartNew(() => 
            { 
             for (int i =0; i < 10; i++) 
             { 
              string description = string.Format("Scenario {0}", i); 

              notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description))); 

              // represents reading the CSV file. 
              Thread.Sleep(500); 
              notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description))); 
              _store.Add(new Tuple<string, int>(description, i)); 
             } 
             _store.CompleteAdding(); 
            }); 

     var consumer = Task.Factory.StartNew(() => 
               { 
                foreach (var item in _store.GetConsumingEnumerable()) 
                { 
                 var poppedItem = item; 
                 notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1))); 
                 // represents sending stuff to the database. 
                 Thread.Sleep(1000); 
                } 
               }); 
     consumer.Wait(); 
     Console.WriteLine("complete"); 
    } 
} 

这是配对2套相关任务的好方法吗?什么上面的代码不处理(它需要):

  • 如果表示CSV读取故障的任务,其他任务需要停止
  • (即使仍然存在_Store项目。)如果表示数据库的任务插入错误,则其他进程可以停止处理。
  • 如果配对任务中的任何一个出现故障,我将需要执行一些操作来回滚数据库更新(我不担心如何回滚),这更多的是如何编码“发生故障配对任务之一,所以我需要做一些整理“。

以上任何帮助将不胜感激!

回答

2

您可以使用异常处理和取消令牌来执行此操作。当流水线阶段检测到错误时,它会捕获它并设置令牌。这将取消其他阶段。 finally块确保完成对CompleteAdding()的调用。这很重要,因为接收管道阶段可能会在等待收集时被阻塞,并且在取消阻止之前不会处理取消。

您还希望显示集合中任何未处理的对象,或者在您的案例中,在管道阶段完成时(在finally中)和/或整个管道关闭时清理DB连接。

这里的,这是否一个流水线阶段的一个例子:

static void LoadPipelinedImages(IEnumerable<string> fileNames, 
            string sourceDir, 
            BlockingCollection<ImageInfo> original, 
            CancellationTokenSource cts) 
    { 
     // ... 
     var token = cts.Token; 
     ImageInfo info = null; 
     try 
     { 
      foreach (var fileName in fileNames) 
      { 
       if (token.IsCancellationRequested) 
        break; 
       info = LoadImage(fileName, ...); 
       original.Add(info, token); 
       info = null; 
      }     
     } 
     catch (Exception e) 
     { 
      // in case of exception, signal shutdown to other pipeline tasks 
      cts.Cancel(); 
      if (!(e is OperationCanceledException)) 
       throw; 
     } 
     finally 
     { 
      original.CompleteAdding(); 
      if (info != null) info.Dispose(); 
     } 
    } 

整体管道代码如下所示。它还支持通过设置取消标记从外部(从UI)取消管道。

static void RunPipelined(IEnumerable<string> fileNames, 
          string sourceDir, 
          int queueLength, 
          Action<ImageInfo> displayFn, 
          CancellationTokenSource cts) 
    { 
     // Data pipes 
     var originalImages = new BlockingCollection<ImageInfo>(queueLength); 
     var thumbnailImages = new BlockingCollection<ImageInfo>(queueLength); 
     var filteredImages = new BlockingCollection<ImageInfo>(queueLength); 
     try 
     { 
      var f = new TaskFactory(TaskCreationOptions.LongRunning, 
            TaskContinuationOptions.None); 
      // ... 

      // Start pipelined tasks 
      var loadTask = f.StartNew(() => 
        LoadPipelinedImages(fileNames, sourceDir, 
             originalImages, cts)); 

      var scaleTask = f.StartNew(() => 
        ScalePipelinedImages(originalImages, 
             thumbnailImages, cts)); 

      var filterTask = f.StartNew(() => 
        FilterPipelinedImages(thumbnailImages, 
             filteredImages, cts)); 

      var displayTask = f.StartNew(() => 
        DisplayPipelinedImages(filteredImages.GetConsumingEnumerable(), 
         ... cts)); 

      Task.WaitAll(loadTask, scaleTask, filterTask, displayTask); 
     } 
     finally 
     { 
      // in case of exception or cancellation, there might be bitmaps 
      // that need to be disposed. 
      DisposeImagesInQueue(originalImages); 
      DisposeImagesInQueue(thumbnailImages); 
      DisposeImagesInQueue(filteredImages);     
     } 
    } 

对于一个完整的示例看到这里下载该管道例如:

http://parallelpatterns.codeplex.com/releases/view/50473

这里讨论:

http://msdn.microsoft.com/en-us/library/ff963548.aspx

+0

感谢您的详细回答阿德! – primalgeek 2011-04-16 08:56:39