在我们的应用程序中,我们使用带有事件溯源的 Akka.net。持久参与者将他们的事件保存在 SQL Server 数据库中。我们还有视图参与者,它们订阅这些事件,使用日志阅读器/持久性查询来创建物化视图。我们在数据库中有一个表,每个视图参与者都有一行。此行包含视图参与者的名称和最后处理的事件的偏移量。乍一看,这工作顺利。然而,有时,当我们运行导致数千个事件的测试时,期刊阅读器会丢失一些事件。
View Actor 是一个 ReceiveActor。启动时,它从数据库中检索最后处理的事件偏移量(从参与者的构造函数中调用)。偏移量在 OffsetMessage 中通过管道传递给 self。在接收到 OffsetMessage 时,视图 Actor 会初始化日志阅读器。在接收事件时(在 EventEnvelope 消息中),视图会更新。
从日志阅读器运行的操作首先将一行写入日志。该行包含事件偏移量。EventEnvelope 接收处理程序还将一行写入日志。该行还包含事件偏移量。
我们有一个测试导致 9635 事件插入到日志中。有时,日志阅读器和 EventEnvelope 接收处理程序记录的事件少于 9635 个。他们都记录了相同的数字,所以期刊读者似乎错过了这些事件。日志中丢失的事件对应于视图中的丢失项。我们在一个空数据库上运行测试。日志记录处于调试级别,不显示异常。丢失的事件(我们已经看到了 1 到 4 的数字)可以是第一个、中间或最后一个事件。每次这都不一样。
到目前为止,我们不知道是什么导致了这个问题,或者如何解决它。
以下是我们的代码片段。视图 Actor 都继承自一个基类:ViewActorBase。
internal abstract class ViewActorBase : ReceiveActor, ILogReceive
{
public ViewActorBase()
{
// Some initialisation code
....
this.Receive<OffsetMessage>(this.HandleOffsetMessage);
this.ReceiveAsync<EventEnvelope>(this.UpdateState);
var sender = this.Sender;
var self = this.Self;
this.GetViewActorOffset(self, sender);
}
private void HandleOffsetMessage(OffsetMessage offsetMessage)
{
this.InitialiseJournalReader(offsetMessage.Offset);
}
private void InitialiseJournalReader(long offset)
{
// obtain read journal by plugin id
var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>($"akka.persistence.query");
// materialize stream, consuming events
var materializer = ActorMaterializer.Create(Context.System);
// issue query to journal
Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag(this.QueryEventTag, new Sequence(offset));
var self = this.Self;
source.RunForeach(envelope => { this.Logger.Debug("{Date:HH:mm:ss.fffff} JournalReader.Tell {Offset}", DateTime.Now, (envelope.Offset as Sequence).Value); self.Tell(envelope); }, materializer);
}
private void GetViewActorOffset(IActorRef self, IActorRef sender)
{
// Initialise repository
....
repository.GetViewActorOffset(this.GetViewName()).PipeTo(self, sender, offset => new OffsetMessage(offset));
}
}
internal class MyViewActor : ViewActorBase
{
protected override async Task UpdateState(EventEnvelope envelope)
{
var offset = (envelope.Offset as Sequence).Value;
this.Logger.Debug("{Date:HH:mm:ss.fffff} {MethodName} {Offset}", DateTime.Now, $"{this.GetType().Name}.UpdateState", offset);
// Update views
....
}
}
我们的代码或架构有问题吗?有更好的解决方案吗?
附加信息 我们已经使用 SQL Server 探查器运行了一些测试来监视对数据库的查询。
对事件日志执行查询,要求 100 个事件,从偏移量 204743 开始。结果包含 61 行。
<Event id="10" name="RPC:Completed">
<Column id="1" name="TextData">exec sp_executesql N'
SELECT TOP (@Take)
e.PersistenceId as PersistenceId,
e.SequenceNr as SequenceNr,
e.Timestamp as Timestamp,
e.IsDeleted as IsDeleted,
e.Manifest as Manifest,
e.Payload as Payload,
e.SerializerId as SerializerId,
e.Ordering as Ordering
FROM dbo.EventJournal e
WHERE e.Ordering > @Ordering AND e.Tags LIKE @Tag
ORDER BY Ordering ASC
',N'@Tag nvarchar(10),@Ordering bigint,@Take bigint',@Tag=N'%;Module;%',@Ordering=204743,@Take=100</Column>
<Column id="9" name="ClientProcessID">1169425116</Column>
<Column id="10" name="ApplicationName">Core .Net SqlClient Data Provider</Column>
<Column id="12" name="SPID">82</Column>
<Column id="13" name="Duration">353890</Column>
<Column id="14" name="StartTime">2018-08-30T16:32:32.927+02:00</Column>
<Column id="15" name="EndTime">2018-08-30T16:32:33.28+02:00</Column>
<Column id="16" name="Reads">326</Column>
<Column id="17" name="Writes">0</Column>
<Column id="18" name="CPU">0</Column>
<Column id="48" name="RowCounts">61</Column>
</Event>
我们将下一个查询扩展为从 204804 (204743 + 61) 开始。但是,它从 204810 开始。为什么它会跳过(或丢失)6 个事件?