问题标签 [exactly-once]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
38 浏览

database - 是否可以使用 BASE-fashioned 数据库实现 Exacly Once Semantics?

在流处理应用程序(基于 Apache Flink 或 Apache Spark Streaming 的 fe)中,有时需要只处理一次数据。

在数据库世界中,通过使用遵循 ACID 标准的数据库可以实现相同的目标(如果我在这里错了,请纠正我)。

但是有很多(非关系)数据库不遵循 ACID 而是遵循 BASE。

现在我的问题是:如果我要将这样的 BASE 数据库集成到流处理应用程序中(仅一次),我还能保证对整个管道进行一次处理吗?如果这是可能的,在什么情况下?

0 投票
0 回答
147 浏览

java - 在具有“exactly_once”处理保证的 kafka 流拓扑中,异常时消息丢失

我有一个要求,我需要在不丢失任何消息的情况下处理来自 Kafka 的消息,并且还需要维护消息顺序。因此,我在我的 Kafka 流拓扑中使用了事务并启用了“exactly_once”处理保证。因为我假设拓扑处理将是“全有或全无”,即消息偏移仅在最后一个节点成功处理消息后提交。

但是在失败的情况下,例如当数据库关闭并且处理器无法存储消息并引发异常时。此时,拓扑按预期消失,并在重新平衡时自动重新创建。我假设拓扑应该重新使用来自 Kafka 主题的原始消息,或者在应用程序重新启动时,它应该重新使用来自 Kafka 主题的原始消息。但是,似乎原始消息消失了,并且在该拓扑死亡后从未被消费或处理。

我需要做什么来重新处理发送到 Kafka 主题的原始消息?或者 Kafka 配置需要更改什么?我是否需要手动分配状态存储并跟踪在变更日志主题上处理的消息?

拓扑:

处理器:

配置:

测试:

日志:

0 投票
0 回答
418 浏览

java - 使用 Lettuce 时重试 Redis 命令是否安全?

我正在使用用于 JVM 的 Lettuce Redis 客户端来构建由 Redis 列表支持的队列。理想情况下,它的行为类似于内存队列,但由于涉及网络交互,这是不可能的。

Lettuce docs 中有一个关于错误处理的部分,在我的情况下,我想在错误时重试失败的命令。问题是异常层次结构不是很细,我不确定如何处理以下问题:

  • 如何确定是否可以重试失败的命令?命令可能无限期失败并且重试会导致无限循环的原因有很多——当前的 Redis 版本不支持使用的命令语法,密钥已经存在并且它是不兼容的类型等。
  • 如果网络不可靠,我可以依靠生菜始终重新连接吗?是否有可能在某些情况下我不应该重试某些RedisException,或者NativeIoException重新创建 Redis 客户端实例,甚至重新启动整个应用程序?
  • 有没有办法知道 Redis 是否实际执行了失败的命令并且重试不会导致重复或数据丢失(或者至少在某些情况下这可能由异常类确定)?这可能是一次性交付的一个基本问题,据我所知,Redis 没有提供任何方法来处理这个问题(例如,与 Kafka 不同)但也许有一些既定的做法来处理这个问题?

这个问题看起来每个人都必须处理,但似乎没有很多关于此的信息。

0 投票
0 回答
93 浏览

apache-kafka - Kafka Exactly-Once 和压缩

我想我理解幂等性意味着日志中的“每条消息都一次”,但是在谈到 EOS(Exactly-Once Delivery Semantic)时,消费者也发挥了作用并进入了端到端的保证。

因此,正如此处所述(例如),幂等性和事务对于“端到端完全一次语义”来说是必要的并且足够了。

但是,关于压缩的 Kafka文档说:

由于数据以压缩格式存储在代理上,因此有效的提取偏移量是压缩的消息边界。因此,对于压缩数据,消耗的偏移量将一次提前一个压缩消息。这具有在消费者失败的情况下可能重复的副作用。

问题:

  • 即使生产者是幂等的并且消费者在 Kafka 中是事务性的(例如 Kafka Streams),我也可能会看到重复,因为偏移量在压缩消息边界上是先进的。考虑处理不在压缩消息边界的单个消息:偏移量未提前,因此如果消费者失败,我将看到重复消息。正确的?

似乎启用压缩可能会使 EOS 的努力无效。我在讨论 EOS 的文章中没有提到这一点。

0 投票
1 回答
59 浏览

apache-spark - 任务失败的RDD重新计算是否会导致重复数据处理?

当特定任务失败导致 RDD 从 lineage 重新计算(可能通过再次读取输入文件)时,Spark 如何确保没有重复处理数据?如果失败的任务已将一半数据写入 HDFS 或 Kafka 等输出怎么办?它会再次重写那部分数据吗?这与仅一次处理有关吗?

0 投票
0 回答
27 浏览

apache-kafka - 在 kafka 流中恰好有一次 - 不工作

我通过关闭多个代理在 kafka 流中只测试一次。但是,当我重新启动代理时,在出站主题上会多次生成相同的消息。我正在使用融合版本 6.1.0 将处理保证设置为恰好将 beta 确认设置为全部

任何人都可以帮助我了解我是否缺少任何配置?

0 投票
1 回答
48 浏览

google-cloud-platform - 如何在 Google Cloud Storage 中只处理一次新上传的对象?

我想将文件接收到 Google Cloud Storage 存储桶中,并为每个文件只运行一次 Python 作业。我希望同时运行许多这样的 Python 作业,以便并行处理许多文件,但每个文件应该只处理一次。

我考虑了以下几点:

发布/订阅消息

为存储桶上的 OBJECT_FINALIZE 事件生成 Pub/Sub 消息。这里的问题是 Pub/Sub可能会多次传递消息,因此侦听同一订阅的 Python 作业池可能会为同一消息运行多个作业,所以我可以...

  1. 使用 Dataflow 对消息进行重复数据删除,但在我的非流式用例中,dataflow 似乎代价高昂,而且这个答案似乎表明它不是适合这项工作的工具。

或者

  1. 使用事务数据库(例如 Cloud SQL 上的 PostgreSQL)创建锁定机制。任何收到消息的作业都可以尝试获取与文件同名的锁,任何未能获取锁的作业都可以终止并且不确认消息,并且任何具有锁的作业可以继续处理并将锁标记为已完成以防止将来获取该锁。

我认为 2 会起作用,但它也感觉过度设计。

轮询

让作业轮询存储桶中的新文件,而不是使用 Pub/Sub。

这感觉就像它只是用一个仍然需要锁定机制的不太健壮的解决方案替换 Pub/Sub。

事件弧

使用Eventarc触发保存我的代码的 Cloud Run 容器。这似乎类似于 Pub/Sub,而且更简单,但我找不到 Eventarc 如何处理重试之类的事情的解释,或者它是否带有任何一次性保证。

单个控制器产生多个工人

创建一个中央控制器进程来处理文件事件的重复数据删除(通过 Pub/Sub、轮询或 Eventarc 接收),然后生成工作作业并将每个文件准确地分配给工作作业一次。

我认为这也可行,但会产生单点故障并可能造成吞吐量瓶颈。

0 投票
0 回答
32 浏览

java - 幂等Kafka生产者回调

我在网上找不到这个问题的确切答案,所以我想我可以尝试明确地询问:

我正在用 Java 编写一个 Kafka Producer。我想异步生成到我的主题的消息,并提供一个回调来记录错误/成功消息。我需要的是至少交付,我可以很好地忍受经纪人上的一些重复。

然后我偶然发现了 enable.idempotence=true 属性以确保一次性交付。见这里:https ://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

如果我做对了,它的工作原理如下:

  1. 如果写入主题失败,Kafka 将自动重试。
  2. 默认情况下,这发生 INTEGER.MAX_VALUE 次。

现在我的问题是:我知道为 Kafka 事务提供回调是不必要的,但是如果我启用幂等性会发生什么。假设代理端出现严重故障,执行了 N 次重试尝试:我的错误处理回调是否也会触发并记录 N 次?即使启用了幂等性,在生产者端提供回调是否常见?

0 投票
1 回答
33 浏览

apache-kafka - kSqlDB Exactly Once 处理保证

我通过非常不优雅地关闭 docker 运行进程或让 docker 容器内存不足来测试 ksqldb 服务器上的恰好一次语义。在这两种情况下,我都会收到重复的内容,这绝对不是保证的行为。我觉得我可能在这里错过了明显的...

docker 容器有KSQL_KSQL_STREAMS_PROCESSING_GUARANTEE=exactly_once参数集。据我了解,这将为enable.idempotence和消费者isolation.level属性设置基础生产者设置。

由于以下查询,仍然会出现重复项:here

还有这里

你可以忽略sql的逻辑。这些只是使问题更有意义的例子。关键是在容器崩溃期间偏移量显然会丢失。

我还可以做些什么 ?我缺少任何属性吗?
我正在使用来自 confluent community v6.2.1 和 ksqldb v0.21 的 kafka 代理

0 投票
0 回答
37 浏览

apache-kafka - KStreams 状态存储和回滚

我想知道以下两种情况会发生什么。

情况1:使用DSL

关于情况 1 的问题:在第 1 点。1.未捕获的异常或没有。2. Fail to Produce n/w 问题,服务重启时,聚合维护的状态存储会恢复到之前的状态吗?

情况2:

问题情况 2:放入状态存储的值是否会在 1 或 2 的异常中恢复?


简而言之。

Q1。就像一次一样,当我们不使用一次时,推送到状态存储的更改会在失败的情况下恢复吗?https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

Q2。非常愚蠢,但我不得不问,是否曾经适用于所有类型的 KStream 状态存储,一种是创建 DSL(聚合),另一种是我创建的用于 .transform 块?