我目前正在使用Event Store来处理我的事件。我目前需要重播特定类型的事件,因为我对它们订阅和写入数据库的方式进行了更改。
这可能吗?如果是这样,怎么办?谢谢。
我目前正在使用Event Store来处理我的事件。我目前需要重播特定类型的事件,因为我对它们订阅和写入数据库的方式进行了更改。
这可能吗?如果是这样,怎么办?谢谢。
您不能告诉 EventStore 将特定事件重播到持久订阅上,因为持久订阅的目的是为订阅者保持状态。
要实现这种修复,您确实需要一个追赶应用程序来完成这项工作。
真的,如果您考虑一下,如果您将所有事件重播到一个新数据库中,那么您那里会有正确的数据吗?
所以我有一个控制台应用程序,它重用与持久连接相同的逻辑,但唯一的区别是:
这就是 EventStore 的重点 - 您只需重播所有事件以随时构建任何数据库,这将是正确的
您的持久连接处理新的传入事件并应用更新。
如果您启用 $by_event_type 投影,那么您可以在下面访问该投影流
/streams/$et-{事件类型}
https://eventstore.org/docs/projections/system-projections/index.html
然后,您可以根据需要使用 .net api 阅读它。
这是一些帮助您入门的代码
private static T GetInstanceOfEvent<T>(ResolvedEvent resolvedEvent) where T : BaseEvent
{
var metadataString = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata);
var eventClrTypeName = JObject.Parse(metadataString).Property(EventClrTypeHeader).Value;
var @event = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(resolvedEvent.Event.Data), Type.GetType((string) eventClrTypeName));
if (!(@event is BaseEvent))
{
throw new MessageDeserializationException((string) eventClrTypeName, metadataString);
}
return @event as T;
}
private static IEventStoreConnection GetEventStoreConnection()
{
var connectionString = System.Configuration.ConfigurationManager.ConnectionStrings["EventStore"].ConnectionString;
var connection = EventStoreConnection.Create(connectionString);
connection.ConnectAsync().Wait();
return connection;
}
private static string GetStreamName<T>() where T : BaseEvent
{
return "$et-" + typeof(T).Name;
}
要阅读事件,您可以使用此代码段
StreamEventsSlice currentSlice;
long nextSliceStart = StreamPosition.Start;
const int sliceCount = 200;
do
{
currentSlice = await esConnection.ReadStreamEventsForwardAsync(streamName, nextSliceStart, sliceCount, true);
foreach (var @event in currentSlice.Events)
{
var myEvent = GetInstanceOfEvent<OrderMerchantFeesCalculatedEvent>(@event);
TransformEvent(myEvent);
}
nextSliceStart = currentSlice.NextEventNumber;
} while (currentSlice.IsEndOfStream == false);