问题标签 [exactly-once]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
144 浏览

apache-kafka - 有没有办法在 EOS Kafka 流中获得承诺的偏移量?

背景 :

将消费者拦截器设置为 StreamsConfig 将确保在使用/提交消息时调用拦截器。片段来自org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync

但是consumerInterceptor.onCommit()即使我看到在源主题中提交了偏移量,也从未调用过。

问题:

我认为这是因为我使用的是启用了 Exactly once 处理保证的 kstreams。

这是当时的逻辑org.apache.kafka.streams.processor.internals.StreamTask#commit

正如你所看到的,consumer.commitSyncwhich 反过来调用consumerCoordinator.commitwhich 调用interceptor.onCommit, 永远不会被调用,因为启用了 eos,它是被调用的事务 api。

问题: 当在启用 eos 的源主题上提交偏移量时,有没有办法可以将回调挂钩到 kstream?

0 投票
1 回答
156 浏览

snowflake-cloud-data-platform - 雪花连接器重复记录

根据文档

我们在 SF 2 记录中具有相同的 RECORD_METADATA:

我们的主题键是一个 Protobuf 记录,但我认为这应该不是问题。

0 投票
1 回答
694 浏览

apache-kafka - Apache Flink 中的端到端 Exactly-once 处理

Apache Flink 通过从检查点恢复作业来保证在故障和恢复时只进行一次处理,检查点是分布式数据流和操作员状态的一致快照(分布式快照的 Chandy-Lamport 算法)这保证在故障转移时只发生一次。

在正常集群操作的情况下,Flink 如何保证只处理一次,例如给定一个从外部源(例如 Kafka)读取的 Flink 源,Flink 如何保证从源中读取一次事件?事件源和 Flink 源之间是否有任何类型的应用程序级别的确认?另外,Flink 如何保证事件从上游算子到下游算子只传播一次?这是否也需要对收到的事件进行任何类型的确认?

0 投票
1 回答
434 浏览

apache-kafka - Spring Cloud Stream 项目无法获取分区信息错误

当我使用此配置时:

我的应用程序正确启动,但我在控制台输出中看到的 transactional.id 显示为 null。我已将此额外配置(事务)应用于 spring-cloud-stream,以获得正确的 transactional.id:

但是服务没有成功启动,控制台输出显示:

无法获取分区信息 我认为我的配置有问题(肯定)

我的意图是有 Exactly Once,以避免重复。这就是为什么我想看到那个 transactional.id

额外信息: 我的消费者同时使用 JPA 和 Kafka 事务进行事务处理(使用 chainedKafkaTransactionManager 进行事务同步)

已编辑: 在 @Configuration 类中,我有这些 bean

我的处理器类与相应的@Transactional

根据我的第一个配置显示,事务同步有效。

我使用此日志记录配置来确认TransactionSynchronizationManager 的Initializing transaction 同步Clearing transaction 同步

0 投票
1 回答
169 浏览

apache-kafka - kafka幂等生产者可以确保多个分区恰好一次吗

我只是Kafka的新手,对kafka生产者的幂等性有所了解。

据我了解,当生产者向代理发送消息时,代理需要向生产者发回 ACK 以告知其已收到消息。如果生产者由于某种原因没有收到 ACK,则生产者必须再次向代理发送相同的消息,以便复制消息。而幂等生产者可以消除这个问题。

基本上,每个生产者都将被分配一个 PID,每个消息将被分配一个序列号。所以PID+序列号可以识别一条消息。这就是 kafka 的幂等性的工作原理。

如果我是对的,假设我为一个主题创建了三个分区,生产者通过轮询算法向三个分区发送消息,也就是说三个分区将一个接一个地接收消息。在这种情况下,Kafka 还能保证幂等性吗?

例如,有 a、b 和 c 三个分区。

在某个时刻,生产者正在向分区 a 发送消息 X,a 成功接收到 X,但在发送回 ACK 时失败。所以生产者重新发送消息X。现在我有两个问题:

  1. 是分区 a 还是分区 b 会收到最新的消息 X?
  2. 如果是分区b,是不是意味着分区a和分区b会有相同的消息X,也就是说这种情况下Kafka不能保证幂等性?
0 投票
1 回答
356 浏览

apache-kafka - 恢复事务性发件箱模式

问题描述:

使用跨越数据库和消息代理的分布式事务来自动更新数据库和发布消息/事件是不可行的。

发件箱模式描述了一种让服务以安全和一致的方式执行这两个任务的方法;它为源服务提供即时“读取您自己的写入”语义,同时提供跨服务边界的可靠、最终一致的数据交换。

如果我从 topicA 读取消息 -> 向 topicB 写入消息(使用 Kafka Streams 的语义恰好一次)并使用事件监听器更新数据库,会有什么缺点?

这意味着在数据库实体被持久化之前,我将具有最终的一致性,但不会丢失数据,因为我在 Kafka 主题中有消息(重试直到持久性工作)。

这种模式还存在以下问题:

消息中继可能会多次发布消息。例如,它可能会在发布消息之后但在记录它已经这样做的事实之前崩溃。当它重新启动时,它将再次发布消息。因此,消息消费者必须是幂等的,可能通过跟踪它已经处理的消息的 ID 来实现。幸运的是,由于消息消费者通常需要是幂等的(因为消息代理可以多次传递消息),这通常不是问题。

问题:

因此,当涉及到妥协时,什么更好,保持 Kafka 作为单一事实来源并处理数据库中的最终一致性,还是将 Db 作为事实来源并使用 kafka 作为愚蠢的消息代理?

我对你的意见很感兴趣!谢谢!

0 投票
1 回答
147 浏览

apache-kafka - Flink 恰好一次语义和数据丢失

我们有一个 Flink 设置,Kafka 生产者目前使用至少一次语义。我们正在考虑切换到关于 Kafka 生产者的一次性语义,因为这会给我们带来好处。不过考虑到文档,这似乎会增加我们目前没有的非常可能的数据丢失风险。如果由于 Flink 无法恢复或 Kafka 代理关闭而导致长时间停机,则 Kafka 事务可能会过期并且数据将丢失。

如果 Flink 应用程序崩溃和完成重启之间的时间大于 Kafka 的事务超时时间,则会出现数据丢失(Kafka 会自动中止超过超时时间的事务)。

这似乎是一种全新的风险,在至少一次语义中不存在并且无法减轻。无论设置了多大的事务超时,都可能存在实际情况。在我看来,最好的方法是设置非常短的检查点间隔,因为它会导致事务关闭但仍然是非常大的事务超时(以小时为单位),以便尝试减少数据丢失的机会。我的理解正确吗?

0 投票
0 回答
136 浏览

apache-kafka-streams - Kafka Streams 精确一次重新平衡聚合状态数据丢失

仅一次运行 3 个 Kafka Streams 实例,但在重新启动其中一个流实例时遇到数据丢失(另外 2 个正在重新平衡)。如果我快速重新启动实例(在 内session.timeout.ms),而其他 2 没有重新平衡,一切都按预期工作。

  • 输入和输出主题由 6 个分区创建。
  • 运行 3 个 Kafka 代理。
  • 在循环中使用单个 python 生产者生成数据 ( acks='all')。
  • 使用配置的单个 Kafka Connect 将数据输出到 SQLconsumer.override.isolation.level=read_committed

我期望聚合数据与我的 python 循环的输出具有相同的计数。只要 Kafka Streams 不进入重新平衡状态,这就可以正常工作。

简而言之,流实例会:

  1. 收集会话数据,并更新会话状态。
  2. 然后使用窗口聚合对会话状态的增量更新进行重新分区和求和。

通过我自己的调试输出,我倾向于认为问题与转移聚合状态有关:

  1. 作为会话 X 的更新的记录 A 将 0 添加到聚合中。
  2. 聚合的输出现在是 6
  3. 作为对会话 X 的更新的记录 B 将 1 添加到聚合中。
  4. 聚合的输出现在是 7
  5. 再平衡
  6. 对会话 X(可能是也可能不是重播或记录 A)的更新正在将 0 添加到聚合中。
  7. 聚合的输出现在是 6

代码的简化和剥离版本:(不是真正的 Java 开发人员,对非最佳语法感到抱歉)

和:

0 投票
1 回答
253 浏览

apache-kafka - Apache Kafka Exactly Once 事务 id 如何影响新的 fetch request producer fencing 方法

在早期版本的 Kafka 中,在消费者组不匹配期间,事务 id 和主题分区之间应该有一次性语义静态映射,因此事务 id 有可能获得不同的主题分区。

为避免这种情况KIP-447:生产者可扩展性仅在实现语义后,我从 KIP-477 中了解到,旧生产者在新 API(sendOffsetToProdcuer)的帮助下使用 fetch offset 调用进行防护,因此事务。 id 不用于围栏。

但我的疑问是,

  1. 仍然是事务性生产者期望 transaction.id 我应该如何为最新的 Kafka 版本选择这个值?

  2. transaction.id 应该与分区有静态映射,获取偏移量防护仅在消费者组重新平衡期间生效?

  3. 这个值对最新版本无效吗?

请帮我解决这个问题,我正在尝试了解 Kafka EoS 并在生产系统中实现它。

0 投票
1 回答
75 浏览

apache-kafka - 流处理时如何在Cassandra中实现精确一次?

我有一个看起来像这样的 Cassandra 表

我有一个流管道来更新 Cassandra 中的项目。流式管道每隔一段时间设置检查点。因此,当管道失败时,它将重新处理自上次成功检查点以来的源数据。并且当它在失败后重新处理时,它会尝试覆盖Cassandra中已经成功写入的数据(即在最后一个成功的检查点之后但在失败之前)。我正在考虑利用modified_at专栏来实现这一目标。就像是

只有当 Cassandra 中的 modified_at 小于管道中的 modified_at 时,我才尝试进行更新。然而,这抛出InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements

我虽然在这种情况下 IF 条件可以提供帮助。

但这会抛出InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions

那么处理这个问题的理想方法是什么?

编辑 如果我在这个表中只有这些字段,那么重新处理事件可能不是什么大问题,因为当管道赶上实时流但说还有另一个时,它最终会变得一致使用当前价格、可用单位等更新同一张表的流式作业。在这种情况下,如果其中一项作业失败并重新启动,则该表可能处于不一致状态。