问题标签 [outbox-pattern]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
953 浏览

azure-cosmosdb - 如何在 Cosmos DB 中实现发件箱模式

我希望在 Cosmos DB 中实现对发件箱模式的支持。

但是,Cosmos DB 似乎不支持跨集合的事务。

那我该怎么做呢?

我一直在考虑一些方法来实现这一点:

使用服务总线事务

服务总线事务范围内,发送消息(尚未提交),进行 Cosmos DB 更新,如果它有效,那么我们提交服务总线事务以使消息对订阅者可用。

使用触发器在发件箱集合中插入行

随着插入/更新的发生,我们使用 Cosmos DB 触发器将相应的消息插入到发件箱表中,从那时起,一切照旧。

使用触发器执行 azure 函数

创建Azure 函数作为 Cosmos DB 触发器。我几乎喜欢这个,但直接向服务总线发送消息会更好。

使用数据泵

添加两个字段UpdateTimestampOutboxMessageTimestamp. 当记录更新时,UpdateTimestamp.

某些进程会查找这两个不匹配的记录,并为每个记录创建一条通知消息并将其中继到相应的队列或主题。

当然,然后它会更新第二个时间戳,以便它们匹配。

关于如何做到这一点的其他想法?

0 投票
1 回答
1187 浏览

masstransit - 使用内存发件箱时,Mass Transit 如何处理重试重复数据删除和消息 ID 生成

Mass Transit 有一个内存中的“发件箱”实现,我认为它可以处理我希望克服的大部分问题/挑战,但是我找不到很多详细描述我正在寻找的功能的文档。在观看了 Udi Dahan 解释如何在没有分布式事务的情况下处理可靠消息传递的视频 ( https://vimeo.com/111998645 ) 之后,出现了很多这些问题。

  1. 内存中的发件箱是否处理尝试向队列发送消息时可能发生的故障?例如:消费者生成 3 条消息,这些消息收集在发件箱中。消费者完成没有问题。发件箱中收集的消息开始被处理
    • 如果由于某种原因在处理收集的消息时出现网络问题(或其他问题)并且消息 2 无法发送,消息 2 和 3 会发生什么情况?是否有任何类型的重试策略?
    • 如果发件箱中正在处理的邮件成功添加到队列中但未成功标记为在发件箱中已发送,会发生什么情况?是否会再次尝试将消息发送到队列?
    • 假设如果出现某种故障,发件箱将重试向队列发送消息,那么消息 ID 是否保证在尝试之间保持一致?拥有一致的消息 ID 对于重复数据删除很重要,以确保我们不会多次处理相同的消息。
  2. 当一条消息被消费时,是否会发生任何重复数据删除?(这与 1.C 相关)
    • Mass Transit 如何跟踪每位消费者的已处理记录?存储引擎是否负责此责任?
  3. 是否有任何类型的“事务”暴露给消费者,允许您在不引发异常的情况下清除发件箱中收集的消息,或者抛出异常是回滚发件箱的唯一方法?
    • 消费者外部生成的消息怎么样,有没有办法回滚在发件箱中收集的消息(例如:WebAPI 控制器操作)?
  4. 是否建议使用 Mass Transit 的 DTC 功能而不是发件箱,反之亦然,或同时使用它们?
  5. 目前,Mass Transit 没有可以在进程崩溃后幸免于难的发件箱实现。有计划加入这样的功能吗?是否有跟踪此路线的路线图?
0 投票
1 回答
424 浏览

confluent-schema-registry - Debezium 发件箱模式 | 如果我们使用 debezium,架构是用 SMT/发件箱表固定的吗

带发件箱模式的 Debezium

设置上下文:

  1. 使用
  2. 我们想使用模式注册表来存储不同业务实体的所有事件模式
  3. 一个主题可以有多个版本的相同模式
  4. 一个主题可以具有完全不同的模式,并受业务上下文的限制。例如 customerCreated、customerPhoneUpdated、customerAddressUpdated。(使用一种主题名称策略)
  5. 想验证 debezium 是否支持第 2 点和第 3 点(特别是第 3 点)。

想象一下,我有两个业务事件 customerCreated 和 orderCreated,我想将它们存储到同一个主题“com.business.event”中。

客户创建

{ “id”:”244444” “name”:”test”, “address”: “test 123”, “email” : “test@test.com” }

订单创建

{ “id”:”244444” “value”:”1234”, “address”: “test 123”, “phone”: “3333”, “deliverydate”: “10-12-19” }

我的发件箱表的结构如下文所示

https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

专栏 | 类型 | 修饰符 --------------+------------+--------- -- 标识 | uuid | 非空聚合类型 | 字符变化(255) | 非空聚合ID | 字符变化(255) | 非空类型 | 字符变化(255) | 非空有效载荷 | jsonb | 不为空

现在,当我将业务事件推送到上表时,它会将 customerCreated 和 orderCreated 事件作为字符串/JSON 存储到有效负载列中。如果我使用 debezium 连接器将其推送到主题“com.business.event”中的 kafka,它将产生以下消息。(例如使用模式打印)

customerCreated.json

{ "schema": { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"eventType" }, { "type":"string", "optional":false, "name":"io.debezium.data.Json", "version":1, "field":"payload" } ], "optional":false }, "payload": { "eventType":"Customer Created", "payload":"{\"id\": \"2971baea-e5a0-46cb-b1b1-273eaf88246a\", \"name\": \"jitender\", \"email\": \"test\", \"address\": \"700 \"}}" } }

orderCreated.json

}

问题:

正如您在上面的示例中所见,模式 registry/kafka 中的模式保持不变,尽管有效负载包含不同的业务实体。现在,当我作为消费者尝试反序列化此消息时,我应该知道有效负载可以包含基于它们生成的业务事件的不同结构。在这个场景中,我无法充分利用模式注册表,因为消费者应该提前知道所有的业务实体。

问题 :

  1. 我想做的是,debezium 应该使用主题名称策略(下面的示例)在同一主题“com.business.event”下创建两个不同的模式。 https://karengryg.io/2018/08/18/multi-schemas-in-one-kafka-topic/

现在作为消费者,当我使用消息时,我的消费者将从主题消息中读取模式 id,并从模式注册表中获取它,并将直接用它解码消息。解码后,如果我对业务事件不感兴趣,我可以忽略该消息。通过这样做,我可以使用模式注册表在同一主题下拥有不同的模式。

  1. 当我将 debezium 与模式注册表结合使用时,我可以控制 kafka 主题中的模式吗?发件箱表或发件箱图案是必须的。
0 投票
1 回答
284 浏览

azure - 如何使用 Db Transaction 向 Microsoft EventHub 发送消息?

我想通过 Db 事务将事件发送到 Microsoft Event-hub:

解释:

  1. 用户点击了订单创建的端点。
  2. OrderService 接受订单并将该订单放入数据库。
  3. 现在订单服务希望使用事件中心将该 orderId 作为事件发送到另一个服务。

如何实现第 2 步和第 3 步的事务行为?

我知道这些解决方案:

  1. 发件箱模式:我将消息放在另一个带有订单创建事务的表中。还有一个 cron/scheduler,它从表中获取消息并将它们标记为已传递。下次 cron 将只接收未传递的消息。

  2. 使用数据库审计日志和获取这些东西的库。库会将数据库表绑定到 Event-hub。然后在每个更新库上都会将该更改发送到 Event-hub。

我想知道 Event-hub 中是否有任何内置的事务功能?

或者

有没有更好的方法来处理这件事?

0 投票
1 回答
115 浏览

outbox-pattern - OutBox 模式 - 重用记录是否有缺点

我正在阅读 OutBox 模式实现,它在表中创建记录,然后 debezium 连接器读取 bin-log 以将这些更改发布到 Kafka。这会引发一个问题,即在添加记录(并写入 bin 日志)之后,它只会占用存储空间,并且表会变得非常大。有几种清理旧记录的方法,例如按日期删除分区、创建后删除或 DbTriggers 删除记录)

我的建议是,我会在这个表中预先创建 1,000,000 条记录,每次只随机更新一条记录。debezium 功能将保留,我将避免删除旧记录。

除了为那些 1M 记录支付固定的存储空间。还有其他理由避免这种方法吗?

0 投票
1 回答
356 浏览

apache-kafka - 恢复事务性发件箱模式

问题描述:

使用跨越数据库和消息代理的分布式事务来自动更新数据库和发布消息/事件是不可行的。

发件箱模式描述了一种让服务以安全和一致的方式执行这两个任务的方法;它为源服务提供即时“读取您自己的写入”语义,同时提供跨服务边界的可靠、最终一致的数据交换。

如果我从 topicA 读取消息 -> 向 topicB 写入消息(使用 Kafka Streams 的语义恰好一次)并使用事件监听器更新数据库,会有什么缺点?

这意味着在数据库实体被持久化之前,我将具有最终的一致性,但不会丢失数据,因为我在 Kafka 主题中有消息(重试直到持久性工作)。

这种模式还存在以下问题:

消息中继可能会多次发布消息。例如,它可能会在发布消息之后但在记录它已经这样做的事实之前崩溃。当它重新启动时,它将再次发布消息。因此,消息消费者必须是幂等的,可能通过跟踪它已经处理的消息的 ID 来实现。幸运的是,由于消息消费者通常需要是幂等的(因为消息代理可以多次传递消息),这通常不是问题。

问题:

因此,当涉及到妥协时,什么更好,保持 Kafka 作为单一事实来源并处理数据库中的最终一致性,还是将 Db 作为事实来源并使用 kafka 作为愚蠢的消息代理?

我对你的意见很感兴趣!谢谢!

0 投票
0 回答
129 浏览

mysql - 使用 Avro 作为使用 MySQL Debezium 连接器的发件箱模式的数据格式

我正在使用 debezium MySQL 来实现发件箱模式。在我的有效负载列中,我使用 Avro 并将 Avro 转换为字节,以便我可以将 avro 数据存储到 MySQL 数据库中。如果我想在未来进行模式演变,我想知道我应该在哪里保留我的模式?我应该将编写器模式作为单独的列存储在我的发件箱表中吗?或者,还有更好的方法?太感谢了!!

0 投票
0 回答
63 浏览

apache-kafka-connect - 从 CDC/发件箱模式的角度看 Debezium 缩放

我计划使用 CDC/发件箱模式来支持主要数据源和辅助数据源之间的最终一致性以及后处理。我的问题是:Debezium 如何在缩放模式下处理 CDC?如果仅支持一个实例,则为 SPOF。

是否支持通过重复数据删除支持扩展 Debezium 组件 - 记录仅发出一次。

非常感谢托马斯

0 投票
1 回答
395 浏览

postgresql - 使用 AWS Aurora Postgres 和 aws_lambda.invoke 的事务发件箱模式

我正在开发一个由几个微服务组成的项目。我计划在插入触发器后通过在 Postgres 中调用 lambda 函数来使用事务发件箱模式。

我在想这样的事情

在这里,lambda 函数将接收 JSON 格式的新记录并发送一条 SQS 消息。消息发送成功后,将删除记录tx_outbox_table

我想知道这里是否有我遗漏的缺点。你认为这是一个生产就绪的解决方案吗?有什么我应该注意的吗?

0 投票
1 回答
515 浏览

duplicates - 发件箱模式 - 任何 SQL 和 NoSQL DB 的无重复和无序消息中继

当我们需要更改 2 个系统中的数据时,双重写入是一个问题:数据库(SQL 或 NoSQL)和 Apache Kafka(例如)。必须更新数据库并可靠/原子地发布消息。最终的一致性是可以接受的,但不一致是不能接受的。

没有 2 阶段提交 (2PC) 双重写入会导致不一致。

但在大多数情况下,2PC 不是一个选项。

Transactional Outbox是一种微服务架构模式,其中一个单独的消息中继进程将插入数据库的事件发布到消息代理。

交易发件箱

并行运行的多个消息中继进程会导致发布重复(2 个进程读取 OUTBOX 表中的相同记录)或无序(如果每个进程只读取 OUTBOX 表的一部分)。

单个消息中继进程也可能多次发布消息。消息中继可能会在处理 OUTBOX 记录之后但在记录它已经这样做的事实之前崩溃。当消息中继重新启动时,它将再次发布相同的消息。

如何在事务性发件箱模式中实现消息中继,以便将重复消息或无序的风险降至最低,并且该概念适用于所有 SQL 和 NoSQL 数据库?