我已经编写了一个将logentries以json格式写入Azure Data Lake中的文件的日志系统。多个线程正在写入该文件,因此我使用ConcurrentAppendAsync
方法。使用CancellationToken.None在使用ConcurrentAppendAsync并发内容时附加TaskCanceledException
我初始化这样的访问:
var clientCredential = new ClientCredential(ClientId, ClientSecret);
var creds = ApplicationTokenProvider.LoginSilentAsync(Domain, clientCredential).GetAwaiter().GetResult();
fileSystemClient = new DataLakeStoreFileSystemManagementClient(creds);
fileSystem = fileSystemClient.FileSystem;
,我写这样的内容:
private async Task WriteToDataLakeAsync(IGrouping<string, LogEvent> logEvents)
{
var logEvent = logEvents.FirstOrDefault();
if (logEvent == null)
return;
var filename = string.Format(@"Logs\{0:yyyy}\{0:MM}\{0:dd}\{1}\{2}.json", logEvent.Session.Timestamp, logEvent.Session.Origin, logEvent.Session.SessionId);
var jsonBuilder = new StringBuilder();
foreach (var @event in logEvents)
{
jsonBuilder.AppendLine(JsonConvert.SerializeObject(@event.SimplifyForBlob(), Formatting.None));
}
try
{
await fileSystem.ConcurrentAppendAsync(Account, filename, new MemoryStream(Encoding.UTF8.GetBytes(jsonBuilder.ToString())), AppendModeType.Autocreate, cancellationToken: CancellationToken.None);
}
catch (Exception ex)
{
telemetryClient.TrackException(new ExceptionTelemetry(new Exception($"{nameof(DataLake)}: Failed to write {filename}", ex)));
}
}
的问题是,虽然我通过CancellationToken.None
到fileSystem.ConcurrentAppendAsync(...)
我仍然获得在TaskCanceledException
我catch block:
System.Exception: System.Threading.Tasks.TaskCanceledException: 在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib程序,版本= 4.0.0.0,文化=中性公钥= b77a5c561934e089) 在System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib程序,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler + <> c__DisplayClass11_0 + < b__1> d.MoveNext(Microsoft.Rest.ClientRuntime,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Versi on = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at Microsoft.Rest.RetryDelegatingHandler + d__11.MoveNext(Microsoft.Rest.ClientRuntime,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) at System。 Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken =在System.Net.Http.HttpClient + d__58.MoveNext(System.Net.Http,Version = 4.1.1.1,Culture = neutral,PublicKeyToken = b03f5f7f11d50a3a) System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib, Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(mscorlib,Version = 4.0.0.0,Culture = neutral, PublicKeyToken = b77a5c561934e089) at System.Olure.Azure.Management.DataLake.Store.FileSystemOperations + d__10.MoveNext(Microsoft.Azure.Management.DataLake.Store,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) 。 Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at Microsoft.Azure.Management.DataL ake.Store.FileSystemOperationsExtensions + d__3.MoveNext(Microsoft.Azure.Management.DataLake.Store,Version = 2.0.0.0,Culture = neutral,PublicKeyToken = 31bf3856ad364e35) at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(mscorlib,Version = 4.0.0.0,Culture = neutral,PublicKeyToken = b77a5c561934e089) at DHS.PlanCare.CentralLogging.EventHub。Processors.StreamProcessors.DataLake + d__15.MoveNext(DHS.PlanCare.CentralLogging.EventHub.Processors,Version = 1.0.0.0,Culture = neutral,PublicKeyToken = null:D:\ Sources \ DHS \ DHS.PlanCare.ManagementPortal \ DHS.PlanCare .ManagementPortal.EventHub.Processors \ StreamProcessors \ DataLake.cs:95)
(95号线是调用ConcurrentAppendAsync
)
什么可以在这里发生了什么? the docs对此异常没有任何说明。在文档中甚至有一个错误,因为它声明当CancellationToken未被传递时,值将是null
,这是不可能的,因为CancellationToken是值类型,因此不能是null
。
所以我知道一个事实,我不会发起任何形式的取消。问题是,我可以放心地忽略这个异常,或者根本不写数据。由于大量的数据摄入以及偶然发生的事实,很难检查数据是否实际写入。
不知道这是为什么投票决定关闭的原因,“为什么这个代码不工作”。我真的不能再考虑我可以追加到这篇文章。我有一个特定的问题和错误,但我意识到它不能被轻易复制,但是为什么我希望找到使用这种技术并已经处理过的人。 –
当等待服务器对ConcurrentAppend调用的响应时,SDK客户端可能达到其超时限制。您可以配置此超时[创建DataLakeStoreFileSystemManagementClient时](https://docs.microsoft.com/zh-cn/dotnet/api/microsoft.azure.management.datalake.store.datalakestorefilesystemmanagementclient.-ctor?view=azuremgmtdatalakestore-2.2 。0#Microsoft_Azure_Management_DataLake_Store_DataLakeStoreFileSystemManagementClient__ctor_Microsoft_Rest_ServiceClientCredentials_System_String_System_String_System_Int32_System_Net_Http_DelegatingHandler ___)。 –
在单个ConcurrentAppend调用中最多传输多少数据? –