2014-11-20 65 views
1

我有一个紧密的循环,它贯穿一系列购物车,它们本身包含大约10个事件事件对象,并将它们写入经由中间存储库磁盘在JSON(与GetEventStore.com重新布线jOliver共同结构域):紧密环路 - 磁盘100%,四核CPU @ 25%的使用率,只有15MBsec的磁盘写入速度

// create ~200,000 carts, each with ~5 events 
List<Cart> testData = TestData.GenerateFrom(products); 
foreach (var cart in testData) 
{ 
    count = count + (cart as IAggregate).GetUncommittedEvents().Count; 
    repository.Save(cart); 
} 

我看到磁盘说,这是100%,但始终是“低”(15MB /秒,〜5000事件每秒)为什么这样,我能想到的事情是:

  1. 因为这是单线程做es 25%的CPU使用率实际上意味着我所在的1核心的100%(任何显示我的应用程序在Visual Studio中运行的特定核心的方法)?

  2. 我受I/O或CPU的限制吗?如果我为每个CPU创建一个自己的线程池,我能期待更好的性能吗?

  3. 如何以〜120MB /秒的速度复制文件,但我的应用程序中只能获得15MB /秒的吞吐量?这是由于许多较小数据包的写入大小?

还有什么我错过了?

throughput

我使用的代码是从geteventstore文档/博客:

public class GetEventStoreRepository : IRepository 
{ 
    private const string EventClrTypeHeader = "EventClrTypeName"; 
    private const string AggregateClrTypeHeader = "AggregateClrTypeName"; 
    private const string CommitIdHeader = "CommitId"; 
    private const int WritePageSize = 500; 
    private const int ReadPageSize = 500; 

    IStreamNamingConvention streamNamingConvention; 

    private readonly IEventStoreConnection connection; 
    private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None }; 


    public GetEventStoreRepository(IEventStoreConnection eventStoreConnection, IStreamNamingConvention namingConvention) 
    { 
     this.connection = eventStoreConnection; 
     this.streamNamingConvention = namingConvention; 
    } 

    public void Save(IAggregate aggregate) 
    { 
     this.Save(aggregate, Guid.NewGuid(), d => { }); 

    } 

    public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders) 
    { 
     var commitHeaders = new Dictionary<string, object> 
       { 
        {CommitIdHeader, commitId}, 
        {AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName} 
       }; 
     updateHeaders(commitHeaders); 

     var streamName = this.streamNamingConvention.GetStreamName(aggregate.GetType(), aggregate.Identity); 
     var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList(); 
     var originalVersion = aggregate.Version - newEvents.Count; 
     var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1; 
     var eventsToSave = newEvents.Select(e => ToEventData(Guid.NewGuid(), e, commitHeaders)).ToList(); 

     if (eventsToSave.Count < WritePageSize) 
     { 
      this.connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave).Wait(); 
     } 
     else 
     { 
      var startTransactionTask = this.connection.StartTransactionAsync(streamName, expectedVersion); 
      startTransactionTask.Wait(); 
      var transaction = startTransactionTask.Result; 

      var position = 0; 
      while (position < eventsToSave.Count) 
      { 
       var pageEvents = eventsToSave.Skip(position).Take(WritePageSize); 
       var writeTask = transaction.WriteAsync(pageEvents); 
       writeTask.Wait(); 
       position += WritePageSize; 
      } 

      var commitTask = transaction.CommitAsync(); 
      commitTask.Wait(); 
     } 

     aggregate.ClearUncommittedEvents(); 
    } 

    private static EventData ToEventData(Guid eventId, object evnt, IDictionary<string, object> headers) 
    { 
     var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, serializerSettings)); 

     var eventHeaders = new Dictionary<string, object>(headers) 
       { 
        { 
         EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName 
        } 
       }; 
     var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings)); 
     var typeName = evnt.GetType().Name; 

     return new EventData(eventId, typeName, true, data, metadata); 
    } 
} 
+1

没有实际将数据保存到磁盘的代码不多,我们可以说。仔细检查你是否正确缓冲并使用足够大的缓冲区。 – Voo 2014-11-20 17:30:27

+0

不够公平,下面添加代码,如前所述,这是通过nuget使用geteventstore.com eventstore.client v3.0.1。 – g18c 2014-11-20 18:03:43

+1

嗯,我不知道API,但是你确实有一个'CommitAsync',并且等待代码听起来像一个非常糟糕的主意,如果你的事件不是几kb大。 – Voo 2014-11-20 18:08:14

回答

2

它在评论中提到的部分,但要加强,作为你的工作完全单在上述代码中进行线程化(尽管您使用异步,您只是在等待它们,所以有效工作的同步),您会遇到来回切换和事件存储协议的延迟和开销。无论是真正走异步路线,还是避免等待异步线程并将其并行化(EventStore喜欢并行化,因为它可以批量多次写入),或者自行批量发送,例如一次发送20个事件。