问题标签 [exactly-once]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
400 浏览

apache-flink - Flink 两阶段提交 map 函数实现完全一次语义

背景:

我们有一个 Flink 管道,它由多个源、多个接收器和管道沿线的多个运算符组成,这些运算符还更新数据库。

为了这个问题并使其更简单,让我们假设我们有一个看起来像这样的管道:

该管道应该允许我们收听有关某些数据更改的通知。(每个通知都包含一个 ID)对于每个通知,我们从数据库中读取数据,运行算法并更新相同的数据库行。之后,我们还发出数据变化的幅度。只有当数据变化幅度足够大时,我们才会向另一个 Kafka 主题发出通知。

  • Source 订阅 Kafka 主题以侦听更改数据 ID 的通知。
  • KeyBy 是通过 ID 键入的,以确保同一 ID 不会被 2 个操作员实例同时处理。
  • 给定 ID,FlatMap 从 DB 读取数据,运行算法并更新相同的 DB 行。它发出变化幅度。它是 FlatMap 而不是 Map,因为在某些情况下,我们不想发出任何变化幅度,例如,如果我们有一些特定的错误。
  • 过滤器过滤流的幅度小于某个阈值
  • Sink 正在将过滤后的通知发送到另一个 Kafka 主题。

问题:

我们希望以一次性语义运行管道。从我们看到的情况来看,Flink 支持 Kafka 源、Kafka 接收器以及中间的有状态或有状态操作符的一次性语义。我们找不到任何地方解释如何使用您在管道中更新的资源执行一次。有一个TwoPhaseCommitSinkFunction允许创建一个允许完全一次语义的接收器函数。

我们不能使用它,因为我们想更新数据库,然后向 Kafka 发出更改通知。在 2 个单独的接收器中执行此操作将产生竞争条件,我们可以在数据库实际更新之前收到幅度通知。

我们错过了什么吗?有没有办法在 Map/FlatMap 运算符中实现 2 阶段提交?还有其他解决方案吗?

谢谢!

0 投票
1 回答
572 浏览

java - 当 PROCESSING_GUARANTEE_CONFIG 设置为 EXACTLY_ONCE 时,Kafka 无法重新平衡

我有一个运行良好的 Kafka 流应用程序。但是,当我添加属性时:

properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

然后我收到以下错误:

任何人都知道什么可以解决这个问题?

0 投票
1 回答
184 浏览

apache-kafka - Kafka - 即使应用程序仅配置一次且持久性最高,也会丢失消息

在某些情况下(非常罕见,但确实有)我收到了重复文件,即使所有内容都配置为具有高耐用性并且我们只使用一次配置。

请检查下面导致此问题的应用程序上下文和测试场景。

卡夫卡集群设置

3 个 Kafka 代理(host1 上 1 个,host2 上 2 个,host3 上 3 个)

3 个 Zookeeper 实例(host1 上 1 个,host2 上 2 个,host3 上 3 个)

卡夫卡配置

ZooKeeper 配置

Kafka 内部主题描述

应用主题

应用程序消费者属性

应用程序生产者属性

应用程序处理提交

使用 KafkaTransactionManager,我们启动事务,使用 KafkaTemplate 将消息写入输出主题,并发送消费者偏移量(spring-kafka 2.2.8.RELEASE)。

测试预期/实际

  • 向输入主题写入 32,000 条消息

  • 启动 3 个应用实例

  • 开始一一处理消息(max.poll.records = 1)

  • 在处理过程中,并行发送 SIGKILL (kill -9) 到 host1 和 host2 Kafka Brokers 50 次。

  • 等待 60 秒

  • 在处理过程中,向 host1 和 host3 Kafka Brokers 并行发送 SIGKILL (kill -9) 50 次。

  • 等待 60 秒

  • 在处理过程中,向 host2 和 host3 Kafka Brokers 并行发送 SIGKILL (kill -9) 50 次。

期望有 32,000 条消息到输出主题,但是,有时我们实际上最终会得到重复(至少一条)。

有时我们最终会收到 32,000 条消息,并且一切正常。

0 投票
1 回答
55 浏览

apache-kafka - 无法使用 Alpakka 中的 Transactional.Sink 向 Kafka 主题生成消息,但我看到启用了幂等生产者

嗨,我正在尝试使用 Alpakka 文档中所示的 Producer api。我可以使用事务源来使用记录,并且创建了生产者,但无法将消息发送到主题无法使用 Alpakka 中的 Transactional.Sink 生产到主题,但我看到启用了幂等生产者。我看到它正在进入逻辑的日志但它没有向 myTopic 产生事件

[info] OakcpKafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c, transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] 幂等生产者已启用。

你能帮我理解为什么它可能不会产生主题信息吗

我正在使用 docker 在本地运行我的代码

下面是我的代码

0 投票
0 回答
123 浏览

apache-spark - 在本地环境中使用 Apache Beam + SparkRunner 进行 Exactly-Once 测试

我在从本地 Kafka INPUT_TOPIC 读取并写入另一个本地 Kafka OUTPUT_TOPIC 的光束管道下方运行。我创建了一个发布者来提供 INPUT_TOPIC (手动)和一个消费者来检查我在 OUTPUT_TOPIC 上得到了什么,但想知道它是否是测试精确一次语义的正确设置。

Beam 和 Kafka 相对较新,因此正在寻找有关如何以更好的方式测试此管道的建议,并确认恰好一次语义在本地环境中有效。

注意: 我已经在我的机器中安装了 Apache Spark 并使用-Pspark-runner选项运行管道。

光束管线示例

谢谢

0 投票
1 回答
208 浏览

apache-flink - Exactly-once:谁在存储历史数据,flink 还是数据源

我知道 Apache Flink 有 Exactly once 的能力,它依赖于 checkpoint 机制和可重发的数据源。

按照我的理解,如果 Flink 的一个算子出现错误,它需要重新运行它的最后一个操作,所以它必须需要获取历史数据。在这种情况下,历史数据应该/可以存储在哪里?

说数据源是Apache Kafka,那我可以让Kafka存储历史数据吗?我可以让 Flink 存储历史数据吗?或者我可以让他们两个都这样做吗?如果他们两个可以一起做这件事,是不是我可以让Kafka存储一部分历史数据,让Flink存储另一部分历史数据,这样我就可以保存更多的历史数据?

0 投票
1 回答
803 浏览

apache-kafka - 使用 spring kafka 的恰好一次语义

我试图测试我的一次性配置,以确保我设置的所有配置都是正确的,并且行为符合我的预期

我似乎遇到了重复发送的问题

这是我的“测试”,您可以假设构建器放置了正确的配置。

ConsumerMessageLogic 是一个类,它处理恰好一次语义所支持的读取-过程-写入的“过程”部分

在生产者类中,我有一个发送消息方法,如下所示:

我像这样创建我的消费者:

当我创建我的生产者时,我使用 UUID 作为事务前缀创建它,就像这样

现在一切都设置好后,我在一个主题上启动了 2 个实例,每个实例有 2 个分区,每个实例从使用的主题中获得 1 个分区。

我发送一条消息并在调试中等待实例 A 中的事务超时(模拟连接丢失),一旦超时通过另一个实例(实例 B)自动处理记录并将其发送到目标主题导致重新平衡发生了

到目前为止,一切都很好。现在,当我在实例 A 上释放断点时,它说它正在重新平衡并且无法提交,但我仍然在我的目标主题中看到另一个输出记录。

我的期望是,一旦我释放断点,实例 A 将不会继续其工作,因为记录已被处理。

难道我做错了什么?这个场景可以实现吗?

编辑2:

在加里关于事务中执行的评论之后,如果我冻结其中一个实例直到超时并在另一个实例处理记录后释放它,然后冻结的实例处理并为输出主题生成相同的记录,我会得到重复的记录...

生产者中没有 executeInTransaction 的新 sendMessage 方法

以及我将消费者容器创建更改为具有与建议的相同生产者工厂的事务管理器

我的期望是代理一旦从冻结的实例接收到处理过的记录,它就会知道该记录已经由另一个实例处理,因为它包含完全相同的元数据(或者是吗?我的意思是,PID 会不同,但它应该不同吗?)

也许我正在寻找的场景在当前甚至不支持 kafka 和 spring 提供的支持......

如果我有 2 个读进程写实例 - 这意味着我有 2 个具有 2 个不同 PID 的生产者。

现在,当我冻结其中一个实例时,当未冻结的实例由于重新平衡而获得记录进程责任时,它将使用自己的 PID 和元数据中的序列发送记录。

现在,当我释放冻结的实例时,他发送了相同的记录,但有自己的 PID,所以经纪人不可能知道它是重复的......

我错了吗?我怎样才能避免这种情况?我虽然重新平衡停止了实例并且不让它完成它的过程(他产生重复记录的地方)因为他不再对该记录负责

添加日志:冻结实例:您可以在 10:53:34 看到冻结时间,我在 10:54:02 发布它(重新平衡时间为 10 秒)

接管分区并在重新平衡后产生记录的常规实例

我注意到他们都注意到完全相同的提交偏移量

我想当他们试图提交完全相同的事情时,经纪人会中止其中一项交易......

我还注意到,如果我将 transaction.timeout.ms 减少到仅 2 秒,无论我在调试时冻结实例多长时间,它都不会中止事务......

也许 transaction.timeout.ms 的计时器仅在我发送消息后才开始?

0 投票
1 回答
1077 浏览

java - Flink Kafka EXACTLY_ONCE 导致 KafkaException ByteArraySerializer 不是 Serializer 的实例

所以,我试图在我的 Flink Kafka 流作业中启用 EXACTLY_ONCE 语义以及检查点。

但是我没有让它工作,所以我尝试从 Github 下载测试示例代码: https ://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming- kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

所以运行这个工作正常。但是,启用检查点时出现错误。或者,如果我将 EXACTLY_ONCE 更改为 AT_LEAST_ONCE 语义并启用检查点,它工作正常。但是当将其更改为 EXACTLY_ONCE 时,我再次收到此错误。

我得到的例外:

我对代码进行了细微的更改,以便在我的环境中工作。我在 docker 内的 flink 操作操场内运行它。(这个https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-operations-playground.html)。最新版本,1.10 和里面提供的 kafka 版本是 2.2.1

可以找到示例中的其他类: https ://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java /org/apache/flink/streaming/kafka/test/base

我确实尝试将序列化更改为使用 KafkaSerializationSchema 而不是使用 SerializationSchema 的示例代码。但是,下面的代码也没有帮助。同样的错误。

所有帮助表示赞赏。我一直无法在 flink 和 kafka 之间找到任何 EXACTLY_ONCE garantuee 的在线工作代码。只加载谈论它的文章,但不加载实际的实质性工作代码。这就是我想要在这里实现的。

0 投票
1 回答
333 浏览

apache-spark - 如何在 Spark Structured Streaming 中使用 foreachBatch 接收器实现一次性写入保证

文档

默认情况下,foreachBatch 仅提供至少一次写入保证。但是,您可以使用提供给函数的 batchId 作为对输出进行重复数据删除并获得完全一次保证的方式。

这是否意味着除非我们加倍努力,否则即使我们在主 writeStream 操作中使用检查点,我们也无法实现一次性写入保证?

如果是,应该怎么做才能实现一次性写入保证?文档中的含义是:

使用 batchId ?

PS:最初的问题是针对 Kafka 的,但我将其概括为解决方案将适用于 foreachBatch 块中的任何内容。

0 投票
1 回答
237 浏览

apache-kafka - 我可以依靠 Kafka 流中的内存中 Java 集合通过微调标点和提交间隔来缓冲事件吗?

在一个简单java.util.List的输入中缓冲事件的自定义处理器process()- 此缓冲区不是状态存储。

每 30 秒 WALL_CLOCK_TIMEpunctuate()对该列表进行排序并刷新到接收器。假设只有单个分区源和接收器。需要EOS处理保证。

我知道在任何给定时间要么process()被执行要么punctuate()被执行。

我担心这个缓冲区不受变更日志主题的支持。理想情况下,我认为这应该是支持 EOS 的国有商店。

但是有一个论点是设置commit.interval为超过 30 秒 - 即 40 秒,将确保缓冲区中的事件永远不会丢失。而且由于我们使用WALL_CLOCK_TIMEpunctuate()是 ,无论我们是否有事件,都将始终每 30 秒调用一次。

这是一个有效的论点吗?这里有哪些情况会使缓冲区中的事件永远丢失?