1

我有一个紧密的循环,它贯穿大量购物车,这些购物车本身包含大约 10 个事件事件对象,并通过中间存储库(使用 GetEventStore.com 重新连接的 jOliver 公共域)以 JSON 格式将它们写入磁盘:

// create ~200,000 carts, each with ~5 events
List<Cart> testData = TestData.GenerateFrom(products);
foreach (var cart in testData)
{
    count = count + (cart as IAggregate).GetUncommittedEvents().Count;
    repository.Save(cart);
}

我看到磁盘说它是 100%,但整个过程是“低的”(15MB/秒,每秒约 5,000 个事件)为什么会这样,我能想到的事情是:

  1. 由于这是单线程的,25% 的 CPU 使用率实际上是否意味着我所在的 1 个内核的 100%(以任何方式显示我的应用程序在 Visual Studio 中运行的特定内核)?

  2. 我受 I/O 或 CPU 的限制吗?如果我为每个 CPU 创建一个自己的线程池,我可以期待更好的性能吗?

  3. 为什么我可以以 ~120MB/秒的速度复制文件,但我的应用程序只能获得 15MB/秒的吞吐量?这是由于许多较小数据包的写入大小造成的吗?

还有什么我错过的吗?

吞吐量

我使用的代码来自 geteventstore 文档/博客:

public class GetEventStoreRepository : IRepository
{
    private const string EventClrTypeHeader = "EventClrTypeName";
    private const string AggregateClrTypeHeader = "AggregateClrTypeName";
    private const string CommitIdHeader = "CommitId";
    private const int WritePageSize = 500;
    private const int ReadPageSize = 500;

    IStreamNamingConvention streamNamingConvention;

    private readonly IEventStoreConnection connection;
    private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.None };


    public GetEventStoreRepository(IEventStoreConnection eventStoreConnection, IStreamNamingConvention namingConvention)
    {
        this.connection = eventStoreConnection;
        this.streamNamingConvention = namingConvention;
    }

    public void Save(IAggregate aggregate)
    {
        this.Save(aggregate, Guid.NewGuid(), d => { });

    }

    public void Save(IAggregate aggregate, Guid commitId, Action<IDictionary<string, object>> updateHeaders)
    {
        var commitHeaders = new Dictionary<string, object>
                {
                    {CommitIdHeader, commitId},
                    {AggregateClrTypeHeader, aggregate.GetType().AssemblyQualifiedName}
                };
        updateHeaders(commitHeaders);

        var streamName = this.streamNamingConvention.GetStreamName(aggregate.GetType(), aggregate.Identity);
        var newEvents = aggregate.GetUncommittedEvents().Cast<object>().ToList();
        var originalVersion = aggregate.Version - newEvents.Count;
        var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1;
        var eventsToSave = newEvents.Select(e => ToEventData(Guid.NewGuid(), e, commitHeaders)).ToList();

        if (eventsToSave.Count < WritePageSize)
        {
            this.connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave).Wait();
        }
        else
        {
            var startTransactionTask = this.connection.StartTransactionAsync(streamName, expectedVersion);
            startTransactionTask.Wait();
            var transaction = startTransactionTask.Result;

            var position = 0;
            while (position < eventsToSave.Count)
            {
                var pageEvents = eventsToSave.Skip(position).Take(WritePageSize);
                var writeTask = transaction.WriteAsync(pageEvents);
                writeTask.Wait();
                position += WritePageSize;
            }

            var commitTask = transaction.CommitAsync();
            commitTask.Wait();
        }

        aggregate.ClearUncommittedEvents();
    }

    private static EventData ToEventData(Guid eventId, object evnt, IDictionary<string, object> headers)
    {
        var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt, serializerSettings));

        var eventHeaders = new Dictionary<string, object>(headers)
                {
                    {
                        EventClrTypeHeader, evnt.GetType().AssemblyQualifiedName
                    }
                };
        var metadata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventHeaders, serializerSettings));
        var typeName = evnt.GetType().Name;

        return new EventData(eventId, typeName, true, data, metadata);
    }
}
4

1 回答 1

2

评论中部分提到了这一点,但为了加强这一点,因为您在上述代码中完全单线程工作(尽管您使用异步,但您只是在等待它们,因此有效地工作同步)您正在遭受延迟和上下文切换和 EventStore 协议来回的开销。要么真的走异步路线,但避免等待异步线程,而是将其并行化(EventStore 喜欢并行化,因为它可以批量写入)或者自己进行批处理并一次发送例如 20 个事件。

于 2014-12-04T00:42:16.973 回答