我有一个紧密的循环,它贯穿一系列购物车,它们本身包含大约10个事件事件对象,并将它们写入经由中间存储库磁盘在JSON(与GetEventStore.com重新布线jOliver共同结构域):紧密环路 - 磁盘100%,四核CPU @ 25%的使用率,只有15MBsec的磁盘写入速度
// create ~200,000 carts, each with ~5 events
List<Cart> testData = TestData.GenerateFrom(products);
foreach (var cart in testData)
{
count = count + (cart as IAggregate).GetUncommittedEvents().Count;
repository.Save(cart);
}
我看到磁盘说,这是100%,但始终是“低”(15MB /秒,〜5000事件每秒)为什么这样,我能想到的事情是:
因为这是单线程做es 25%的CPU使用率实际上意味着我所在的1核心的100%(任何显示我的应用程序在Visual Studio中运行的特定核心的方法)?
我受I/O或CPU的限制吗?如果我为每个CPU创建一个自己的线程池,我能期待更好的性能吗?
如何以〜120MB /秒的速度复制文件,但我的应用程序中只能获得15MB /秒的吞吐量?这是由于许多较小数据包的写入大小?
还有什么我错过了?
我使用的代码是从geteventstore文档/博客:
public class GetEventStoreRepository : IRepository
{
private const string EventClrTypeHeader = "EventClrTypeName";
private const string AggregateClrTypeHeader = "AggregateClrTypeName";
private const string CommitIdHeader = "CommitId";
private const int WritePageSize = 500;
private const int ReadPageSize = 500;
IStreamNamingConvention streamNamingConvention;
private readonly IEventStoreConnection connection;
private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };
public GetEventStoreRepository(IEventStoreConnection eventStoreConnection, IStreamNamingConvention namingConvention)
{
this.connection = eventStoreConnection;
this.streamNamingConvention = namingConvention;
}
public void Save(IAggregate aggregate)
{
this.Save(aggregate, Guid.NewGuid(), d => { });
}
public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
{
var commitHeaders = new Dictionary<string, object>
{
{CommitIdHeader, commitId},
{AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
};
updateHeaders(commitHeaders);
var streamName = this.streamNamingConvention.GetStreamName(aggregate.GetType(), aggregate.Identity);
var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList();
var originalVersion = aggregate.Version - newEvents.Count;
var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1;
var eventsToSave = newEvents.Select(e => ToEventData(Guid.NewGuid(), e, commitHeaders)).ToList();
if (eventsToSave.Count < WritePageSize)
{
this.connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave).Wait();
}
else
{
var startTransactionTask = this.connection.StartTransactionAsync(streamName, expectedVersion);
startTransactionTask.Wait();
var transaction = startTransactionTask.Result;
var position = 0;
while (position < eventsToSave.Count)
{
var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
var writeTask = transaction.WriteAsync(pageEvents);
writeTask.Wait();
position += WritePageSize;
}
var commitTask = transaction.CommitAsync();
commitTask.Wait();
}
aggregate.ClearUncommittedEvents();
}
private static EventData ToEventData(Guid eventId, object evnt, IDictionary<string, object> headers)
{
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, serializerSettings));
var eventHeaders = new Dictionary<string, object>(headers)
{
{
EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName
}
};
var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
var typeName = evnt.GetType().Name;
return new EventData(eventId, typeName, true, data, metadata);
}
}
没有实际将数据保存到磁盘的代码不多,我们可以说。仔细检查你是否正确缓冲并使用足够大的缓冲区。 – Voo 2014-11-20 17:30:27
不够公平,下面添加代码,如前所述,这是通过nuget使用geteventstore.com eventstore.client v3.0.1。 – g18c 2014-11-20 18:03:43
嗯,我不知道API,但是你确实有一个'CommitAsync',并且等待代码听起来像一个非常糟糕的主意,如果你的事件不是几kb大。 – Voo 2014-11-20 18:08:14