2017-10-12 106 views
0

我正在使用Azure事件中心。我打算发送活动并使用SendBatchAsync。我看到事件中心有256KB的限制(无论是单独或批量发送)将大于256KB的事件推送到Azure EventHub

所以,如果我的数据> 256KB,什么是最佳实践来处理呢?我们是否应该单独发送消息(这将保证它是< 256KB)?

另外,如何将事件分成256KB块并发送到Event Hub? 我看了看Azure文档,看到他们推荐使用EventHubClient.CreateBatch但我没有看到足够的示例。有人可以在我们将如何最终分裂为256KB和SendBatchAsync

在这里需要一些样机或样品或步骤提供是我做的(但不因子256KB限制)

await myEventHubClient.SendBatchAsync(
    events.Select( 
     iEvent => 
      new EventData(
       Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(iEvent))))); 
+0

也许它可以直接与事件枢纽来完成,别人也许可以回答这个问题,但解决这一问题的传统方法是分别,只是存储数据在活动中发送一个ID。在azure中,您可以将您的数据存储在存储桶中,并只需通过id将其下载到处理事件的地方。 – alun

+1

您的方案中单条消息有多大?由于你的帖子中存在矛盾,我有点困惑。你说单个消息保证小于256 KB,但接下来你将讨论以256 KB为单位分割单个消息。因此我的问题。 –

+0

@Peter Bons - 我从Blob/CSV文件读取。我正在写一行这个blob/csv到我们的存储器中并推送给EH。这个blob大小可能是8KB,也可能是256KB甚至500KB。我希望我的设计能够考虑这个限制。我现在正在做的是,我将SendBatchAsync这些行用于EH。如果我单独执行(每行读取)并使用SendAsync,则不会面临此限制,因为1行CSV将小于256KB。希望澄清。 – khar

回答

1

不要轻易尝试并且将多个消息中的单个有效负载分块,考虑到事件不能保证按顺序到达,可能最终在不同的分区中并且可能到达一定的时间间隔,并且可以独立于“总线”。这在规模上变得更具挑战性。

选项1:如果您的有效负载是text/json,并且您的消息大小不超过256kb,请考虑压缩有效负载。您可以从this online tool获得尺寸结果的概念。如果您的有效负载中有大量空白(例如JSON),则也可以使用minify而不是压缩。

选项2:宁可将您的有效内容存储在外部存储中(比如DocumentDB或Blob存储)。

在事件中发送对您的有效负载的引用。这使您的事件保持精简,但您的消费者需要了解如何检索有效负载。一种简单的方法是将有效负载的链接作为URI呈现,显然,您可能需要考虑有效负载存储上的身份验证和授权,以符合您的Event Hub访问策略,以保持其光滑。

+0

谢谢@ Murray-Foxcroft。我喜欢你列出的2个选项。但是,我想考虑使用类似EventHubClient.CreateBatch的Azure团队提供的选项。因此,我需要更多关于如何使用它的例子/信息。 – khar

+0

批处理是将消息捆绑在一起以达到性能或时间原因,而不是创建一组相关消息。每条消息都应该独立存在。让我知道你是怎么办的。 –

0

下面是如何使用发送批次不超过256KB限制的示例。 代码来自这个回购协议(paolosalvatori/ServiceBusExtensions

/// <summary> 
/// This class contains extensions methods for the <see cref="EventHubClient"/> class. 
/// </summary> 
public static class EventHubClientExtensions 
{ 
    private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty."; 
    private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]"; 
    private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]"; 
    private const int MaxBathSizeInBytes = 262144; 

    /// <summary> 
    /// Asynchronously sends a batch of event data to the same partition. 
    /// All the event data in the batch need to have the same value in the Partitionkey property. 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size. 
    /// </summary> 
    /// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param> 
    /// <param name="messages">An IEnumerable object containing event data instances.</param> 
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param> 
    /// <returns>The asynchronous operation.</returns> 
    public static async Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> messages, bool trace = false) 
    { 
     var eventDataList = messages as IList<EventData> ?? messages.ToList(); 
     if (messages == null || !eventDataList.Any()) 
     { 
      throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty); 
     } 

     var batchList = new List<EventData>(); 
     long batchSize = 0; 

     foreach (var eventData in eventDataList) 
     { 
      if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes) 
      { 
       // Send current batch 
       await eventHubClient.SendBatchAsync(batchList); 
       Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count)); 

       // Initialize a new batch 
       batchList = new List<EventData> { eventData }; 
       batchSize = eventData.SerializedSizeInBytes; 
      } 
      else 
      { 
       // Add the EventData to the current batch 
       batchList.Add(eventData); 
       batchSize += eventData.SerializedSizeInBytes; 
      } 
     } 
     // The final batch is sent outside of the loop 
     await eventHubClient.SendBatchAsync(batchList); 
     Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count)); 
    } 

    /// <summary> 
    /// Asynchronously sends a batch of event data to the same partition. 
    /// All the event data in the batch need to have the same value in the Partitionkey property. 
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size. 
    /// </summary> 
    /// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param> 
    /// <param name="messages">An IEnumerable object containing event data instances.</param> 
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param> 
    public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> messages, 
     bool trace = false) 
    { 
     var eventDataList = messages as IList<EventData> ?? messages.ToList(); 
     if (messages == null || !eventDataList.Any()) 
     { 
      throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty); 
     } 

     var batchList = new List<EventData>(); 
     long batchSize = 0; 

     foreach (var eventData in eventDataList) 
     { 
      if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes) 
      { 
       // Send current batch 
       eventHubClient.SendBatch(batchList); 
       Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count)); 

       // Initialize a new batch 
       batchList = new List<EventData> { eventData }; 
       batchSize = eventData.SerializedSizeInBytes; 
      } 
      else 
      { 
       // Add the EventData to the current batch 
       batchList.Add(eventData); 
       batchSize += eventData.SerializedSizeInBytes; 
      } 
     } 
     // The final batch is sent outside of the loop 
     eventHubClient.SendBatch(batchList); 
     Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count)); 
    } 
}