2017-07-31 74 views
0

我已经编写了一个将logentries以json格式写入Azure Data Lake中的文件的日志系统。多个线程正在写入该文件,因此我使用ConcurrentAppendAsync方法。使用CancellationToken.None在使用ConcurrentAppendAsync并发内容时附加TaskCanceledException

我初始化这样的访问:

var clientCredential = new ClientCredential(ClientId, ClientSecret); 
var creds = ApplicationTokenProvider.LoginSilentAsync(Domain, clientCredential).GetAwaiter().GetResult(); 
fileSystemClient = new DataLakeStoreFileSystemManagementClient(creds); 
fileSystem = fileSystemClient.FileSystem; 

,我写这样的内容:

private async Task WriteToDataLakeAsync(IGrouping<string, LogEvent> logEvents) 
    { 
     var logEvent = logEvents.FirstOrDefault(); 
     if (logEvent == null) 
      return; 

     var filename = string.Format(@"Logs\{0:yyyy}\{0:MM}\{0:dd}\{1}\{2}.json", logEvent.Session.Timestamp, logEvent.Session.Origin, logEvent.Session.SessionId); 

     var jsonBuilder = new StringBuilder(); 

     foreach (var @event in logEvents) 
     { 
      jsonBuilder.AppendLine(JsonConvert.SerializeObject(@event.SimplifyForBlob(), Formatting.None)); 
     } 

     try 
     { 
      await fileSystem.ConcurrentAppendAsync(Account, filename, new MemoryStream(Encoding.UTF8.GetBytes(jsonBuilder.ToString())), AppendModeType.Autocreate, cancellationToken: CancellationToken.None); 
     } 
     catch (Exception ex) 
     { 
      telemetryClient.TrackException(new ExceptionTelemetry(new Exception($"{nameof(DataLake)}: Failed to write {filename}", ex))); 
     } 
    } 

的问题是,虽然我通过CancellationToken.NonefileSystem.ConcurrentAppendAsync(...)我仍然获得在TaskCanceledException我catch block:

System.Exception: System.Threading.Tasks.TaskCanceledException: 在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib程序,版本= 4.0.0.0,文化=中性公钥= b77a5c561934e089) 在System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib程序,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler + <> c__DisplayClass11_0 + < b__1> d.MoveNext(Microsoft.Rest.ClientRuntime,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Versi on = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler + d__11.MoveNext(Microsoft.Rest.ClientRuntime,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) at System。 Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken =在System.Net.Http.HttpClient + d__58.MoveNext(System.Net.Http,Version = 4.1.1.1,Culture = neutral,PublicKeyToken = b03f5f7f11d50a3a) System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib, Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(mscorlib,Version = 4.0.0.0,Culture = neutral, PublicKeyToken = b77a5c561934e089) at System.Olure.Azure.Management.DataLake.Store.FileSystemOperations + d__10.MoveNext(Microsoft.Azure.Management.DataLake.Store,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) 。 Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at Microsoft.Azure.Management.DataL ake.Store.FileSystemOperationsExtensions + d__3.MoveNext(Microsoft.Azure.Management.DataLake.Store,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at DHS.PlanCare.CentralLogging.EventHub。Processors.StreamProcessors.DataLake + d__15.MoveNext(DHS.PlanCare.CentralLogging.EventHub.Processors,Version = 1.0.0.0,Culture = neutral,PublicKeyToken = null:D:\ Sources \ DHS \ DHS.PlanCare.ManagementPortal \ DHS.PlanCare .ManagementPortal.EventHub.Processors \ StreamProcessors \ DataLake.cs:95)

(95号线是调用ConcurrentAppendAsync

什么可以在这里发生了什么? the docs对此异常没有任何说明。在文档中甚至有一个错误,因为它声明当CancellationToken未被传递时,值将是null,这是不可能的,因为CancellationToken是值类型,因此不能是null

所以我知道一个事实,我不会发起任何形式的取消。问题是,我可以放心地忽略这个异常,或者根本不写数据。由于大量的数据摄入以及偶然发生的事实,很难检查数据是否实际写入。

+0

不知道这是为什么投票决定关闭的原因,“为什么这个代码不工作”。我真的不能再考虑我可以追加到这篇文章。我有一个特定的问题和错误,但我意识到它不能被轻易复制,但是为什么我希望找到使用这种技术并已经处理过的人。 –

+0

当等待服务器对ConcurrentAppend调用的响应时,SDK客户端可能达到其超时限制。您可以配置此超时[创建DataLakeStoreFileSystemManagementClient时](https://docs.microsoft.com/zh-cn/dotnet/api/microsoft.azure.management.datalake.store.datalakestorefilesystemmanagementclient.-ctor?view=azuremgmtdatalakestore-2.2 。0#Microsoft_Azure_Management_DataLake_Store_DataLakeStoreFileSystemManagementClient__ctor_Microsoft_Rest_ServiceClientCredentials_System_String_System_String_System_Int32_System_Net_Http_DelegatingHandler ___)。 –

+0

在单个ConcurrentAppend调用中最多传输多少数据? –

回答

0

HttpClient在内部使用,超时时抛出OperationCanceledException。这些超时可能会在活动连接期间或尝试建立连接时发生。 DataLakeStoreFileSystemManagementClient有一个构造函数,它带有一个可选的clientTimeoutInMinutes参数,该参数用于设置HttpClient.Timeout

在这种情况下,它实际上是Microsoft.Rest.RetryDelegatingHandler响应HttpClient已设置的引发异常的取消。

它通常也来自内部CancellationTokenSource.CancelAfter,它也可用于超时。

由于这是超时和真正的撤销请求区分有用的,我通常的东西包裹这样的:

try 
{ 
    ... 
} 
catch (OperationCanceledException ex) when (!cancellationToken.IsCancellationRequested) 
{ 
    throw new TimeoutException("The operation timed out.", ex); 
} 
相关问题