下面是如何使用发送批次不超过256KB限制的示例。 代码来自这个回购协议(paolosalvatori/ServiceBusExtensions)
/// <summary>
/// This class contains extensions methods for the <see cref="EventHubClient"/> class.
/// </summary>
public static class EventHubClientExtensions
{
private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty.";
private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
private const int MaxBathSizeInBytes = 262144;
/// <summary>
/// Asynchronously sends a batch of event data to the same partition.
/// All the event data in the batch need to have the same value in the Partitionkey property.
/// If the batch size is greater than the maximum batch size,
/// the method partitions the original batch into multiple batches,
/// each smaller in size than the maximum batch size.
/// </summary>
/// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param>
/// <param name="messages">An IEnumerable object containing event data instances.</param>
/// <param name="trace">true to cause a message to be written; otherwise, false.</param>
/// <returns>The asynchronous operation.</returns>
public static async Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> messages, bool trace = false)
{
var eventDataList = messages as IList<EventData> ?? messages.ToList();
if (messages == null || !eventDataList.Any())
{
throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
}
var batchList = new List<EventData>();
long batchSize = 0;
foreach (var eventData in eventDataList)
{
if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes)
{
// Send current batch
await eventHubClient.SendBatchAsync(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
// Initialize a new batch
batchList = new List<EventData> { eventData };
batchSize = eventData.SerializedSizeInBytes;
}
else
{
// Add the EventData to the current batch
batchList.Add(eventData);
batchSize += eventData.SerializedSizeInBytes;
}
}
// The final batch is sent outside of the loop
await eventHubClient.SendBatchAsync(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
}
/// <summary>
/// Asynchronously sends a batch of event data to the same partition.
/// All the event data in the batch need to have the same value in the Partitionkey property.
/// If the batch size is greater than the maximum batch size,
/// the method partitions the original batch into multiple batches,
/// each smaller in size than the maximum batch size.
/// </summary>
/// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param>
/// <param name="messages">An IEnumerable object containing event data instances.</param>
/// <param name="trace">true to cause a message to be written; otherwise, false.</param>
public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> messages,
bool trace = false)
{
var eventDataList = messages as IList<EventData> ?? messages.ToList();
if (messages == null || !eventDataList.Any())
{
throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
}
var batchList = new List<EventData>();
long batchSize = 0;
foreach (var eventData in eventDataList)
{
if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes)
{
// Send current batch
eventHubClient.SendBatch(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
// Initialize a new batch
batchList = new List<EventData> { eventData };
batchSize = eventData.SerializedSizeInBytes;
}
else
{
// Add the EventData to the current batch
batchList.Add(eventData);
batchSize += eventData.SerializedSizeInBytes;
}
}
// The final batch is sent outside of the loop
eventHubClient.SendBatch(batchList);
Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
}
}
也许它可以直接与事件枢纽来完成,别人也许可以回答这个问题,但解决这一问题的传统方法是分别,只是存储数据在活动中发送一个ID。在azure中,您可以将您的数据存储在存储桶中,并只需通过id将其下载到处理事件的地方。 – alun
您的方案中单条消息有多大?由于你的帖子中存在矛盾,我有点困惑。你说单个消息保证小于256 KB,但接下来你将讨论以256 KB为单位分割单个消息。因此我的问题。 –
@Peter Bons - 我从Blob/CSV文件读取。我正在写一行这个blob/csv到我们的存储器中并推送给EH。这个blob大小可能是8KB,也可能是256KB甚至500KB。我希望我的设计能够考虑这个限制。我现在正在做的是,我将SendBatchAsync这些行用于EH。如果我单独执行(每行读取)并使用SendAsync,则不会面临此限制,因为1行CSV将小于256KB。希望澄清。 – khar