2016-12-05 102 views
1

我试图将10000条记录插入到Azure表存储中。我正在使用ExecuteAsync()来实现它,但不知怎的,插入了约7500条记录,其余记录都丢失了。我故意不使用等待关键字,因为我不想等待结果,只是想将它们存储在表中。以下是我的代码片段。Azure表存储的ExecuteAsync()存储无法插入所有记录

private static async void ConfigureAzureStorageTable() 
    { 
     CloudStorageAccount storageAccount = 
      CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString")); 
     CloudTableClient tableClient = storageAccount.CreateCloudTableClient(); 
     TableResult result = new TableResult(); 
     CloudTable table = tableClient.GetTableReference("test"); 
     table.CreateIfNotExists(); 

     for (int i = 0; i < 10000; i++) 
     { 
      var verifyVariableEntityObject = new VerifyVariableEntity() 
      { 
       ConsumerId = String.Format("{0}", i), 
       Score = String.Format("{0}", i * 2 + 2), 
       PartitionKey = String.Format("{0}", i), 
       RowKey = String.Format("{0}", i * 2 + 2) 
      }; 
      TableOperation insertOperation = TableOperation.Insert(verifyVariableEntityObject); 
      try 
      { 
       table.ExecuteAsync(insertOperation); 
      } 
      catch (Exception e) 
      { 

       Console.WriteLine(e.Message); 
      } 
     } 
    } 

该方法的用法有什么不正确?

+1

如果您不等它完成,它可能无法完成(特别是如果您的过程退出)。而且你不会发现任何错误。 – SLaks

+0

如果你不等它完成,它可能不会完成! – DavidG

+0

你解决了这个问题,有没有更新?您可以在控制台应用程序中捕捉到详细的异常,并在将记录插入到Azure表存储器时通过Fiddler捕获网络包。 –

回答

3

仍然想要await table.ExecuteAsync()。这将意味着ConfigureAzureStorageTable()在那一点返回控制给调用者,它可以继续执行。

你有它的问题的方式,ConfigureAzureStorageTable()将会继续通过调用table.ExecuteAsync()和退出,事情就是这样table会走出去的范围,而table.ExecuteAsync()任务仍然没有完成。

在SO和其他地方使用async void有很多警告,您还需要考虑。你可以很容易有你的方法async Task但在主叫不等待,但让它返回Task周围干净终止等

编辑:一个另外 - 你几乎可以肯定要使用ConfigureAwait(false)在你的await那里,因为你似乎不需要保存任何上下文。这个blog post有一些关于这个和异步的指导方针。

1

如何使用TableBatchOperation一次运行批次的N个刀片?

private const int BatchSize = 100; 

private static async void ConfigureAzureStorageTable() 
{ 
    CloudStorageAccount storageAccount = 
     CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString")); 
    CloudTableClient tableClient = storageAccount.CreateCloudTableClient(); 
    TableResult result = new TableResult(); 
    CloudTable table = tableClient.GetTableReference("test"); 
    table.CreateIfNotExists(); 

    var batchOperation = new TableBatchOperation(); 

    for (int i = 0; i < 10000; i++) 
    { 
     var verifyVariableEntityObject = new VerifyVariableEntity() 
     { 
      ConsumerId = String.Format("{0}", i), 
      Score = String.Format("{0}", i * 2 + 2), 
      PartitionKey = String.Format("{0}", i), 
      RowKey = String.Format("{0}", i * 2 + 2) 
     }; 
     TableOperation insertOperation = TableOperation.Insert(verifyVariableEntityObject); 
     batchOperation.Add(insertOperation); 

     if (batchOperation.Count >= BatchSize) 
     { 
      try 
      { 
       await table.ExecuteBatchAsync(batchOperation); 
       batchOperation = new TableBatchOperation(); 
      } 
      catch (Exception e) 
      { 
       Console.WriteLine(e.Message); 
      } 
     } 
    } 

    if(batchOperation.Count > 0) 
    { 
     try 
     { 
      await table.ExecuteBatchAsync(batchOperation); 
     } 
     catch (Exception e) 
     { 
      Console.WriteLine(e.Message); 
     } 
    } 
} 

您可以根据需要调整BatchSize。小声免责声明:我没有试图运行这个,尽管它应该可以工作。

但我不禁想知道为什么你的功能是async void?这应该保留给事件处理程序和类似的,你不能决定接口。在大多数情况下,你想返回一个任务。因为现在调用者无法捕获在此函数中发生的异常。

1

根据您的要求,我已经成功使用CloudTable.ExecuteAsyncCloudTable.ExecuteBatchAsync测试了您的情况。以下是我使用CloudTable.ExecuteBatchAsync将记录插入到Azure表存储的代码片段,您可以参考它。

程序。CS主要

class Program 
{ 
    static void Main(string[] args) 
    { 
     CloudStorageAccount storageAccount = 
      CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("StorageConnectionString")); 
     CloudTableClient tableClient = storageAccount.CreateCloudTableClient(); 
     TableResult result = new TableResult(); 
     CloudTable table = tableClient.GetTableReference("test"); 
     table.CreateIfNotExists(); 

     //Generate records to be inserted into Azure Table Storage 
     var entities = Enumerable.Range(1, 10000).Select(i => new VerifyVariableEntity() 
     { 
      ConsumerId = String.Format("{0}", i), 
      Score = String.Format("{0}", i * 2 + 2), 
      PartitionKey = String.Format("{0}", i), 
      RowKey = String.Format("{0}", i * 2 + 2) 
     }); 

     //Group records by PartitionKey and prepare for executing batch operations 
     var batches = TableBatchHelper<VerifyVariableEntity>.GetBatches(entities); 

     //Execute batch operations in parallel 
     Parallel.ForEach(batches, new ParallelOptions() 
     { 
      MaxDegreeOfParallelism = 5 
     }, (batchOperation) => 
     { 
      try 
      { 
       table.ExecuteBatch(batchOperation); 
       Console.WriteLine("Writing {0} records", batchOperation.Count); 
      } 
      catch (Exception ex) 
      { 
       Console.WriteLine("ExecuteBatch throw a exception:" + ex.Message); 
      } 
     }); 
     Console.WriteLine("Done!"); 
     Console.WriteLine("Press any key to exit..."); 
     Console.ReadKey(); 
    } 
} 

TableBatchHelper.cs

public class TableBatchHelper<T> where T : ITableEntity 
{ 
    const int batchMaxSize = 100; 

    public static IEnumerable<TableBatchOperation> GetBatches(IEnumerable<T> items) 
    { 
     var list = new List<TableBatchOperation>(); 
     var partitionGroups = items.GroupBy(arg => arg.PartitionKey).ToArray(); 
     foreach (var group in partitionGroups) 
     { 
      T[] groupList = group.ToArray(); 
      int offSet = batchMaxSize; 
      T[] entities = groupList.Take(offSet).ToArray(); 
      while (entities.Any()) 
      { 
       var tableBatchOperation = new TableBatchOperation(); 
       foreach (var entity in entities) 
       { 
        tableBatchOperation.Add(TableOperation.InsertOrReplace(entity)); 
       } 
       list.Add(tableBatchOperation); 
       entities = groupList.Skip(offSet).Take(batchMaxSize).ToArray(); 
       offSet += batchMaxSize; 
      } 
     } 
     return list; 
    } 
} 

注:作为官方document提到有关将一批实体:

一个批处理操作可包括高达 enti领带。

单个批处理操作中的所有实体必须具有相同的分区键

总之,请尝试检查它是否可以在您身边工作。此外,您可以捕获控制台应用程序中的详细异常,并通过Fiddler捕获HTTP请求,以在将记录插入到Azure表存储器时捕获HTTP错误请求。