4

我正在使用事件源实现 CQRS 模式,我正在使用 NServiceBus、NEventStore 和 NES(NSB 和 NEventStore 之间的连接)。

我的应用程序将定期检查 Web 服务是否有任何要下载和处理的文件。当找到文件时,将命令(DownloadFile)发送到总线,并由 FileCommandHandler 接收,FileCommandHandler 创建一个新的聚合根(File)并处理消息。

现在在(文件聚合根目录)中,我必须检查文件的内容是否与任何其他文件内容不匹配(因为网络服务保证只有文件名是唯一的,并且内容可能会以不同的名称重复) ,通过对其进行散列并与散列内容列表进行比较。

问题是我必须在哪里保存哈希码列表?是否允许查询读取模型?

public class File : AggregateBase
{
    public File(DownloadFile cmd, IFileService fileDownloadService, IClaimSerializerService serializerService, IBus bus)
            : this()
        {
        // code to download the file content, deserialize it, and publish an event.
        }
}

public class FileCommandHandler : IHandleMessages<DownloadFile>, IHandleMessages<ExtractFile>
{
        public void Handle(DownloadFile command)
        {
             //for example, is it possible to do this (honestly, I feel it is not, since read model should always considered stale !)
            var file = readModelContext.GetFileByHashCode (Hash(command.FileContent));
            if (file != null)
                throw new Exception ("File content matched with another already downloaded file");

            // Since there is no way to query the event source for file content like:
            // eventSourceRepository.Find<File>(c=>c.HashCode == Hash(command.FileContent));
        }
}
4

3 回答 3

1

好像您正在寻找重复数据删除。

您的命令方面是您希望事情保持一致的地方。查询将始终让您对竞争条件持开放态度。因此,我不会运行查询,而是反转逻辑并将哈希实际写入数据库表(任何具有 ACID 保证的数据库)。如果此写入成功,则处理该文件。如果哈希写入失败,则跳过处理。

将此逻辑放入处理程序是没有意义的,因为在失败的情况下重试消息(即多次存储散列)不会使其成功。您最终还会在错误 q 中收到有关重复文件的消息。

重复数据删除逻辑的好地方可能是您的 Web 服务客户端内部。一些伪逻辑

  1. 获取文件
  2. 开启交易
  3. 将哈希插入数据库并捕获失败(不是任何失败,只有插入失败)
  4. 如果在步骤 3 中插入​​的记录数不为零,则 Bus.Send 消息以处理文件
  5. 提交事务

NServiceBus 网关中的一些示例重复数据删除代码在这里

编辑:
查看他们的代码,我实际上认为这session.Get<DeduplicationMessage>是不必要的。session.Save(gatewayMessage);应该是足够的并且是一致性边界。

只有在失败率很高的情况下进行查询才有意义,这意味着您有很多重复的内容文件。如果 99%+ 的插入成功,则确实可以将重复项视为异常。

于 2014-04-23T09:49:16.690 回答
0

当您在域中具有真正的唯一性约束时,您可以使唯一性测试器成为域服务,其实现是基础架构的一部分——类似于存储库,其接口是域的一部分,其实现是基础的一部分基础设施。对于实现,您可以使用内存中的哈希或根据需要更新/查询的数据库。

于 2014-04-23T07:05:15.017 回答
0

这取决于很多事情……吞吐量就是其中之一。但是,由于无论如何您都在以“基于拉”的方式处理此问题(您正在查询 Web 服务以轮询工作(下载和分析文件)),因此您可以使整个过程串行,而不必担心冲突。现在这可能无法给出您想要处理“工作”的理想速率,但更重要的是......您测量了吗?让我们暂时回避一下,假设串行不起作用。我们在谈论多少个文件?几百、一千、……几百万?取决于该散列可能适合内存,并且如果/当进程停止时可以重建。也可能有机会沿着时间轴或上下文划分您的问题。从黎明开始或今天开始的每个文件,或者本月的文件价值?真的,我认为你应该深入挖掘你的问题空间。除此之外,使用事件溯源来解决这个问题感觉很尴尬,但是 YMMV。

于 2014-04-22T21:00:52.260 回答