2012-10-10 33 views
19

我有一个ID列表,我需要在每个ID上运行多个存储过程。并行不适用于实体框架

当我使用标准的foreach循环时,它工作正常,但是当我有很多记录时,它工作得很慢。

我想转换代码以使用EF,但我收到一个异常:“底层提供程序在打开时失败”。

我使用该代码,Parallel.ForEach内:

using (XmlEntities osContext = new XmlEntities()) 
{ 
    //The code 
} 

但它仍然抛出异常。

任何想法如何才能使用平行与EF?我需要为每个正在运行的程序创建一个新的上下文吗?我有大约10个程序,所以我认为创建10个上下文非常困难,每个上下文一个。

+1

我不是一个多线程专家,但是如果你使用事务或者你的读/写操作被锁定,你可能不会获得比连续执行更好的性能。 – Matthew

+0

我怀疑用更多的线程锤击强调的SQL服务器将无助于性能... – rene

+2

SQL没有强调所有,这就是为什么我想使用并行。它肯定会跑得更快。 –

回答

31

The underlying database connections that the Entity Framework are using are not thread-safe。你需要为您要执行的另一个线程上的每个操作创建一个新的上下文。

您对如何并行化操作的担忧是有效的;许多情况下打开和关闭都很昂贵。

相反,您可能想要颠倒您对并行代码的思考。看起来你正在循环多个项目,然后为每个项目串行调用存储过程。

如果可以的话,创建一个新的Task<TResult>(或Task,如果你不需要的结果)每个程序然后在该Task<TResult>,通过所有的项目的开单情况下,循环,然后执行存储过程。这样,您只有许多上下文等于您并行运行的存储过程的数量。

让我们假设你有两个存储过程,DoSomething1DoSomething2,两者取一类,MyItem实例的MyDbContext

实现上述看起来是这样的:

// You'd probably want to materialize this into an IList<T> to avoid 
// warnings about multiple iterations of an IEnumerable<T>. 
// You definitely *don't* want this to be an IQueryable<T> 
// returned from a context. 
IEnumerable<MyItem> items = ...; 

// The first stored procedure is called here. 
Task t1 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething1(item); 
    } 
}); 

// The second stored procedure is called here. 
Task t2 = Task.Run(() => { 
    // Create the context. 
    using (var ctx = new MyDbContext()) 
    // Cycle through each item. 
    foreach (MyItem item in items) 
    { 
     // Call the first stored procedure. 
     // You'd of course, have to do something with item here. 
     ctx.DoSomething2(item); 
    } 
}); 

// Do something when both of the tasks are done. 

如果无法并行执行存储过程(每一个依赖于一定的秩序中运行),那么你仍然可以并行化您的操作,它只是更复杂一点。

你会看到你的项目上creating custom partitions(使用Partitioner class上的静态Create method)。这将给你的手段获得IEnumerator<T>实现(注意,这是而不是IEnumerable<T>所以你不能foreach在上面)。

对于每个IEnumerator<T>比如你回来,你会创建一个新的Task<TResult>(如果你需要一个结果),并在Task<TResult>身体,你可以创建通过由IEnumerator<T>返回的项目背景,然后循环,调用按顺序存储过程。

这将是这样的:

// Get the partitioner. 
OrdinalPartitioner<MyItem> partitioner = Partitioner.Create(items); 

// Get the partitions. 
// You'll have to set the parameter for the number of partitions here. 
// See the link for creating custom partitions for more 
// creation strategies. 
IList<IEnumerator<MyItem>> paritions = partitioner.GetPartitions(
    Environment.ProcessorCount); 

// Create a task for each partition. 
Task[] tasks = partitions.Select(p => Task.Run(() => { 
     // Create the context. 
     using (var ctx = new MyDbContext()) 
     // Remember, the IEnumerator<T> implementation 
     // might implement IDisposable. 
     using (p) 
     // While there are items in p. 
     while (p.MoveNext()) 
     { 
      // Get the current item. 
      MyItem current = p.Current; 

      // Call the stored procedures. Process the item 
      ctx.DoSomething1(current); 
      ctx.DoSomething2(current); 
     } 
    })). 
    // ToArray is needed (or something to materialize the list) to 
    // avoid deferred execution. 
    ToArray(); 
0

这是一个有点困难,如果任何解决不知道的内部异常结果是什么这一个。这可能只是连接字符串或提供程序配置设置的问题。

一般来说,你必须小心并行代码和EF。但是,你正在做的是 - 应该做的。我脑海中的一个问题;是否有任何工作在之前的并行的另一个实例上完成?根据你的帖子,你正在每个线程中做一个单独的上下文。那很好。然而,如果在多个环境之间进行一些有趣的构造函数争夺,我的一部分会感到奇怪。如果在并行调用之前没有在任何地方使用该上下文,我会建议尝试运行即使是上下文的简单查询以打开它,并确保在运行并行方法之前启动所有EF位。我承认,我还没有尝试,正好是你在这里做了什么,但是我已经完成了并且已经完成了。

+1

,你确实知道这个问题是在2012年发布的,对吧? – Claies

2

这是我使用和工作很好。另外,它还支持错误异常处理,具有调试模式,这使得它更容易追踪下来

public static ConcurrentQueue<Exception> Parallel<T>(this IEnumerable<T> items, Action<T> action, int? parallelCount = null, bool debugMode = false) 
{ 
    var exceptions = new ConcurrentQueue<Exception>(); 
    if (debugMode) 
    { 
     foreach (var item in items) 
     { 
      try 
      { 
       action(item); 
      } 
      // Store the exception and continue with the loop.      
      catch (Exception e) 
      { 
       exceptions.Enqueue(e); 
      } 
     } 
    } 
    else 
    { 
     var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() => 
     { 
      while (partition.MoveNext()) 
      { 
       try 
       { 
        action(partition.Current); 
       } 
       // Store the exception and continue with the loop.      
       catch (Exception e) 
       { 
        exceptions.Enqueue(e); 
       } 
      } 
     })); 
     Task.WaitAll(partitions.ToArray()); 
    } 
    return exceptions; 
} 

您可以使用它像以下内容,其中为DB是原来的DbContext和db.CreateInstance()创建新实例使用相同的连接字符串。

 var batch = db.Set<SomeListToIterate>().ToList(); 
     var exceptions = batch.Parallel((item) => 
     { 
      using (var batchDb = db.CreateInstance()) 
      { 
       var batchTime = batchDb.GetDBTime(); 
       var someData = batchDb.Set<Permission>().Where(x=>x.ID = item.ID).ToList(); 
       //do stuff to someData 
       item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called 
       batchDb.SaveChanges();   
      }     
     }); 
     if (exceptions.Count > 0) 
     { 
      logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions)); 
      throw new AggregateException(exceptions); //optionally throw an exception 
     } 
     db.SaveChanges(); //save the item modifications