问题标签 [ktable]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
359 浏览

apache-kafka - Kafka 世界状态/快照 + 订阅更新

Kafka 中是否有办法让客户端在首次连接时订阅主题以获取世界“KTable”的快照/状态并订阅后续更新?

假设我们有如下记录的主题

当任何客户端(现有的或新的)连接/重新连接时,我想一次性给他们

以流的形式订阅它们以进一步更新。

0 投票
2 回答
902 浏览

apache-kafka - Kafka Ktable 还流式传输重复更新

Kafka Ktable 还流式传输重复更新。

我想处理 Ktable(使用 Kstream.reduce() 创建)更改日志流,即 Ktable 中键值的任何更改。但是,即使将相同的键值对多次发送到 Ktable,它似乎也每次都向下游发送。仅当值更改时,我才需要在键的值中发送更新。

`

`

0 投票
1 回答
180 浏览

apache-kafka - 我可以在 Kafka Stream Topology 中多次使用主题吗?

假设我们假设 kafka 流中没有 groupby 函数。我可以执行以下操作来获取字数并在其上构建 KTable 吗?请注意,我在拓扑中使用了两次“word-count-topic”。我有一个用例,我想迭代地构建一些东西,对于下一个流事件,我想查找以前的值并根据事件更新它。我想在我构建 Ktable 的同一主题中保留最新的价值。

0 投票
1 回答
58 浏览

apache-kafka - 计算 GlobalKTables 的内存占用

我有一个带有 GlobalKTables 的 Kafka Streams 应用程序。我想计算相同的内存占用。

底层 Kafka 主题中的数据使用 SNAPPY 进行压缩。我找不到有关存储在 Ktables 上的数据的信息。记录加载到 KTables 后是解压缩的还是按需解压缩的?

了解计算应用程序内存占用的最佳方法将非常有帮助。

0 投票
1 回答
1108 浏览

apache-kafka - 卡夫卡重新分区(基于键的分组)

当我们基于某个键在流上应用分组函数时,kafka 如何计算这个,因为相同的键可能存在于不同的分区中?我看到了 through() 函数,它基本上对数据进行了重新分区,但我不明白这是什么意思。它会将具有相同键的所有消息移动到单个分区中吗?另外,我们可以多久调用一次 through() 方法?如果有需求,我们可以在收到每条消息后调用它吗?请建议。谢谢

0 投票
1 回答
318 浏览

apache-kafka-streams - KafkaStreams,注册 Avro 模式时出错

免责声明:我对 KafkaStreams 的体验非常有限。当我所做的只是将主题流式传输到 KTable 中以便以后可以在该商店上使用交互式查询时,org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:我不太明白为什么会出现错误。Schema being registered is incompatible with an earlier schema;

这是 SerDes 配置。

Streams 代码:请注意,此时我不希望对 Stream 进行任何过滤或分组,我只希望数据可用于将来通过 Store 查询。

每当我在序列化时streams.start()不断收到这些异常。我有一个架构,所以我使用了SpecificAvroSerdes,但问题是一样的。我想我对为什么我的 KTable 尝试向 Confluent 注册新模式缺少一些基本的理解。

编辑 1: 我现在了解架构寄存器在这里所​​扮演的角色。将 KStream 与 GenericAvroSerde 一起使用,我可以使用来自主题的数据,但仍然无法在 KTable 中实现它。我现在的问题是:

  1. 为什么我总是在同一个分区和偏移量中得到上述异常,即使我没有调用streams.cleanUp(). 为什么它不继续(承诺)。
  2. 这个异常似乎是不可恢复的。所有 Streams 线程都死了,导致应用程序关闭。有没有办法绕过这个?注意:我已经在使用LogAndContinue异常处理程序进行生产和反序列化。

编辑2:

我能够克服这个例外。我的 StateStore 包含具有不兼容架构的先前条目。在我清除主题并更改 ApplicationId 后,它开始工作。

尽管如此,这仍然不能否定捕获Schema being registered is incompatible with an earlier schema;异常的需要。这使 Streams 应用程序停止运行。我尝试使用streams.setUncaughtExceptionHandler可以记录错误的位置,但这并不能阻止 Streams 线程死亡,之后我什至无法启动它们。肯定有办法解决这个问题吗?

0 投票
2 回答
1344 浏览

apache-kafka - GlobalKTable 刷新逻辑

当对 a 的基础主题进行更新时GlobalKTable,所有应用程序实例KStream获取最新数据的逻辑是什么?以下是我的后续问题:

  1. GlobalKTable当更新发生时,是否会被锁定在记录级别或表级别?
  2. 根据这篇博客:Kafka GlobalKTable Latency Issue,延迟可以达到 0.5s 吗?!如果是这样,有没有其他方法可以减少延迟?
  3. 由于GlobalKTable默认使用 RocksDB 作为状态存储,RocksDB 的所有功能都可以使用吗?

我了解GlobalKTable不应将其用于需要频繁更新查找数据的用例。是否有任何其他键值存储可用于可能需要更新表数据的用例 - 例如 Redis?

我找不到太多关于GlobalKTable其内部结构的文档。有可用的文件吗?

0 投票
2 回答
1086 浏览

apache-kafka - 如何在 Kafka 主题中编写 KTable?就像我们在 KStreams 中使用“to()”一样,如何为 KTable 做到这一点?

由于 KTable 没有“to()”方法,所以在向主题发送任何消息之前,我们是否需要始终将其转换为 KStream?或者我们如何在我们的主题中存储一个 KTable?

0 投票
0 回答
39 浏览

java - 如何为 KTable 多对一进行 leftJoin?

我有两个 Ktable,我需要做 leftJoin。这个左连接桅杆是ManyToOne。我有 leftJoin 的下一个代码:

但它就像 OneToOne 一样工作。如何更改 ManyToOne 的代码?

0 投票
1 回答
72 浏览

apache-kafka - 维护单独的 KTable

我有一个主题,其中包含每个会话的用户连接和断开连接事件。我想使用 Kafka 流来处理这个主题并根据某些条件更新 KTable。每条记录都不能更新 KTable。所以我需要处理多条记录才能知道是否需要更新 KTable。

例如,按用户然后按 sessionid 处理流和聚合。如果该用户的至少一个 sessionid 只有 Connected 事件,则 KTable 必须作为用户在线更新(如果尚未更新)。
如果用户的所有 sessionId 都有 Disconnected 事件,KTable 必须更新为用户离线,如果还没有。

我怎样才能实现这样的逻辑?
我们是否可以在所有应用程序实例中实现这个 KTable,以便每个实例在本地都有这些数据?