我有一个要求,使用SqlBulkCopy将大型csv文件分成几个不同的数据库插入。我打算通过两个单独的任务来完成此任务,1个用于批量处理CSV文件,另一个用于插入数据库。作为一个例子,这里是我的事情:具有容错功能的并行生产者/消费者?
public class UberTask
{
private readonly BlockingCollection<Tuple<string,int>> _store = new BlockingCollection<Tuple<string, int>>();
public void PerformTask()
{
var notifier = new UINotifier();
Task.Factory.StartNew(() =>
{
for (int i =0; i < 10; i++)
{
string description = string.Format("Scenario {0}", i);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Reading '{0}' from file", description)));
// represents reading the CSV file.
Thread.Sleep(500);
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Enqueuing '{0}'", description)));
_store.Add(new Tuple<string, int>(description, i));
}
_store.CompleteAdding();
});
var consumer = Task.Factory.StartNew(() =>
{
foreach (var item in _store.GetConsumingEnumerable())
{
var poppedItem = item;
notifier.PerformOnTheUIThread(() => Console.WriteLine(string.Format("Sending '{0}' to the database", poppedItem.Item1)));
// represents sending stuff to the database.
Thread.Sleep(1000);
}
});
consumer.Wait();
Console.WriteLine("complete");
}
}
这是配对2套相关任务的好方法吗?什么上面的代码不处理(它需要):
- 如果表示CSV读取故障的任务,其他任务需要停止
- (即使仍然存在_Store项目。)如果表示数据库的任务插入错误,则其他进程可以停止处理。
- 如果配对任务中的任何一个出现故障,我将需要执行一些操作来回滚数据库更新(我不担心如何回滚),这更多的是如何编码“发生故障配对任务之一,所以我需要做一些整理“。
以上任何帮助将不胜感激!
感谢您的详细回答阿德! – primalgeek 2011-04-16 08:56:39