2

我目前正在使用Event Store来处理我的事件。我目前需要重播特定类型的事件,因为我对它们订阅和写入数据库的方式进行了更改。

这可能吗?如果是这样,怎么办?谢谢。

4

2 回答 2

2

您不能告诉 EventStore 将特定事件重播到持久订阅上,因为持久订阅的目的是为订阅者保持状态。

要实现这种修复,您确实需要一个追赶应用程序来完成这项工作。

真的,如果您考虑一下,如果您将所有事件重播到一个新数据库中,那么您那里会有正确的数据吗?

所以我有一个控制台应用程序,它重用与持久连接相同的逻辑,但唯一的区别是:

  1. 我更改了目标数据库连接字符串 - 所以这将是一个新的数据库或集合(不是损坏的)
  2. 它连接到 EventStore 并从头开始重播所有事件
  3. 它将整个数据库重建到正确的状态
  4. 将业务切换到新数据库

这就是 EventStore 的重点 - 您只需重播所有事件以随时构建任何数据库,这将是正确的

您的持久连接处理新的传入事件并应用更新。

于 2018-07-12T07:35:17.237 回答
0

如果您启用 $by_event_type 投影,那么您可以在下面访问该投影流

/streams/$et-{事件类型}

https://eventstore.org/docs/projections/system-projections/index.html

然后,您可以根据需要使用 .net api 阅读它。

这是一些帮助您入门的代码

      private static T GetInstanceOfEvent<T>(ResolvedEvent resolvedEvent) where T : BaseEvent
            {

                var metadataString = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata);
                var eventClrTypeName = JObject.Parse(metadataString).Property(EventClrTypeHeader).Value;
                var @event = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(resolvedEvent.Event.Data), Type.GetType((string) eventClrTypeName));
                if (!(@event is BaseEvent))
                {
                    throw new MessageDeserializationException((string) eventClrTypeName, metadataString);
                }

                return @event as T;
            }

            private static IEventStoreConnection GetEventStoreConnection()
            {
                var connectionString = System.Configuration.ConfigurationManager.ConnectionStrings["EventStore"].ConnectionString;
                var connection = EventStoreConnection.Create(connectionString);
                connection.ConnectAsync().Wait();
                return connection;
            }

            private static string GetStreamName<T>() where T : BaseEvent
            {
                return "$et-" + typeof(T).Name;
            }

要阅读事件,您可以使用此代码段

 StreamEventsSlice currentSlice;
            long nextSliceStart = StreamPosition.Start;
            const int sliceCount = 200;

            do
            {
                currentSlice = await esConnection.ReadStreamEventsForwardAsync(streamName, nextSliceStart, sliceCount, true);

                foreach (var @event in currentSlice.Events)
                {
                    var myEvent = GetInstanceOfEvent<OrderMerchantFeesCalculatedEvent>(@event);


                    TransformEvent(myEvent);
                }

                nextSliceStart = currentSlice.NextEventNumber;

            } while (currentSlice.IsEndOfStream == false);
于 2018-09-06T14:20:40.270 回答