2

以下是使用事务范围的异步缓存和数据库更新。我无法使用v 4.5.1中引入的TransactionScopeAsyncFlowOption.Enabled,因为我正在使用的Apache Ignite.Net缓存不支持它。我试图通过捕捉当前Synchronization Context,然后明确使用Synchronization Context Send的方法来完成交易找到一个解决办法,但我仍然得到一个错误Transaction scope must be disposed on same thread it was created启用异步TransactionScope不使用TransactionScopeAsyncFlowOption.Enabled

任何建议如何去实现这Async Update不起作用。阿帕奇点燃支持的建议之一是使用类似:

Task.WhenAll(cacheUpdate, databaseUpdate).Wait(),但是这将使异步代码同步,因此不是最好的选择

public async Task Update() 
{ 
    // Capture Current Synchronization Context 
    var sc = SynchronizationContext.Current; 

    TransactionOptions tranOptions = new TransactionOptions(); 
    tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead; 


    using (var ts = new TransactionScope()) 
    { 
     // Do Cache Update Operation as Async 
     Task cacheUpdate = // Update Cache Async 

     // Do Database Update Operation as Async 
     Task databaseUpdate = // Update Database Async 

     await Task.WhenAll(cacheUpdate, databaseUpdate); 

       sc.Send(new SendOrPostCallback(
       o => 
       { 
        ts.Complete(); 
       }), sc);   
    } 
} 
+3

运行异步方法我不太明白你为什么不能使用TransactionScopeAsyncFlowOption.Enabled。你必须在.NET 4.0上运行? – Evk

+0

这将使用由Apache Ignite.Net公开的缓存更新异步方法,这些方法与Java进程通信,并按照它们不支持此选项。我不知道为什么 –

+2

的所有内部细节不知道此功能需要任何第三方组件的明确支持。 – Evk

回答

1

整个博客相当数量的搜索之后的一个文章中,我找到了Stephen Toub的以下博客,它帮助我们在完全相同的线程上实现Continuation of Async方法,从而避免了事务范围问题。现在,我不需要TransactionScopeAsyncFlowOption.Enabled得到的TransactionScope

https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/

void Main() 
{ 
    // Modified Async Scheduler for Continuations to work on Exactly same thread 
    // Required in the case same Thread is required for Task Continuation post await 
    Run(async() => await DemoAsync()); 

    "Main Complete".Dump(); 
} 

static async Task DemoAsync() 
{ 
    // Transcation Scope test (shall dispose 
    using (var ts = new TransactionScope()) 
    {    
     await Cache + Database Async update 
     ts.Complete(); 
     "Transaction Scope Complete".Dump(); 
    } 
} 

// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can 
// Control the threads/Synchronization context post await, cotinuation run of specific set of threads 

public static void Run(Func<Task> func) 
{ 
    // Fetch Current Synchronization context 
    var prevCtx = SynchronizationContext.Current; 

    try 
    { 
     // Create SingleThreadSynchronizationContext 
     var syncCtx = new SingleThreadSynchronizationContext(); 

     // Set SingleThreadSynchronizationContext 
     SynchronizationContext.SetSynchronizationContext(syncCtx); 

     // Execute Func<Task> to fetch the task to be executed 
     var t = func(); 

     // On Continuation complete the SingleThreadSynchronizationContext 
     t.ContinueWith(
      delegate { syncCtx.Complete(); }, TaskScheduler.Default); 

     // Ensure that SingleThreadSynchronizationContext run on a single thread 
     // Execute a Task and its continuation on same thread 
     syncCtx.RunOnCurrentThread(); 

     // Fetch Result if any 
     t.GetAwaiter().GetResult(); 
    } 
    // Reset the Previous Synchronization Context 
    finally { SynchronizationContext.SetSynchronizationContext(prevCtx); } 
} 

// Overriden Synchronization context, using Blocking Collection Consumer/Producer model 
// Ensure that same Synchronization context/Thread/set of threads are maintained 
// In this case we main a single thread for continuation post await 

private sealed class SingleThreadSynchronizationContext : SynchronizationContext 
{ 
    // BlockingCollection Consumer Producer Model 
    private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> 
     m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>(); 

    // Override Post, which is called during Async continuation 
    // Send is for Synchronous continuation 
    public override void Post(SendOrPostCallback d, object state) 
    { 
     m_queue.Add(
      new KeyValuePair<SendOrPostCallback, object>(d, state)); 
    } 

    // RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it 
    public void RunOnCurrentThread() 
    { 
     KeyValuePair<SendOrPostCallback, object> workItem; 
     while (m_queue.TryTake(out workItem, Timeout.Infinite)) 
      workItem.Key(workItem.Value); 
    } 

    // Compete the SynchronizationContext 
    public void Complete() { m_queue.CompleteAdding(); } 
}