问题标签 [ktable]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
257 浏览

apache-kafka - Kafka Stream - 按 client_id 过滤

我正在使用 Kafka Stream 创建一个仅包含特定于 client_id 的数据的 ktable,这不是主题键。我是 Kafka Streams 的新手,这看起来很简单,但我对社区中可用的多个示例感到有些困惑,这些示例非常好。

我正在尝试获取具有 client_id=0123456 的 inputTopic 数据。在下面的 KSQL 中将类似于命令:

下面我试图重现相同的行为。有人可以告诉我在下面做错了什么吗?它没有像我预期的那样过滤。

0 投票
0 回答
558 浏览

apache-kafka - 与 spring cloud kafka stream 加入一对多关系

我正在尝试加入来自两个主题人员和地址的数据,其中一个人可以有多个地址。发布到主题中的数据如下所示:

加入后,我希望有一个聚合输出(在 elasticsearch 中被索引),它应该看起来像这样:

每当人员或地址主题得到更新时,也应更新聚合的人员。目前,我实现了仅在发布地址而不是在更改人员本身时才获取聚合人员的更新。任何想法这段代码有什么问题?

0 投票
0 回答
118 浏览

apache-kafka - Kafka Ktable-Ktable 通过自定义序列化程序加入 3 个 Ktables

我希望为 3 个 ktable 执行 Ktable-Ktable 连接,类似这样

虽然 jsonSerde 在使用 customAvroServde 实现主题时有效,但最终连接失败并出现序列化异常。

在流连接中执行此操作时,我能够使用 StreamJoined.with 功能执行它,该功能有助于为内部流设置序列化程序,但找不到任何类似 ktables 的东西。

我应该如何使用 Ktables 执行连接?

0 投票
1 回答
357 浏览

apache-kafka - Kafka KTable Materialized-State-Store 控制

我们将 KTable 实现为 Internal-State-Store。

a.) 我如何以及在哪里可以指定,这个 Internal-State-Store 应该是 Persistent 并自动备份到另一个 kafka 主题?

b.) 我们如何指定这个 Internal-State-Store 应该是全局的,这样我的任何流任务都应该能够引用它?

c.) 是否存在将传入的 messageRecords 写入 Internal-State-Store 的频率?会不会发生这样的情况,一个特定的 MessageRecord 被流处理器处理,存储在 KTable 中,然后我的流处理器死了,它无法进入 Internal-State-Store !

下面是我们现在使用的片段:-

任何回应都将受到高度赞赏!

0 投票
1 回答
103 浏览

apache-kafka - 覆盖 KStreams 默认序列化器 (ByteArraySerializer)

我似乎无法将主题的序列化程序覆盖为Serdes.String(). 我正在尝试一个从主题(流)读取并写入 KTable 的简单用例。到目前为止我所拥有的:

我得到的例外是:

根据我收集的信息,它理解消息是 aString但它正在使用默认的反序列化器ByteArraySerializer。我在上面的代码中哪里出错了?

0 投票
0 回答
23 浏览

join - Ktable 存储具有相同键的多条记录

我正在创建一个关于两个主题的 ktable-ktable 内部连接,employee并且department,其中employee主题有重复的键。当我在这个主题上构建一个 ktable 时,它​​应该只取每个键对应的最新值(根据我从文档中理解的内容)。

但是员工上的ktable也在拾取所有重复的键。你能帮我理解这是否是 ktable 的预期行为,还是我误解了什么?

代码:

员工话题:

部门主题(键->值):

加入主题(键->值):

0 投票
0 回答
43 浏览

apache-kafka - Ktable 到 Kstream:更改日志何时发布

我们正在使用windowedByaggregate操作KStream来生成一个KTable. 然后我们将其转换KTableKStreamusingtoStream方法 on KTable。我们在这个新KStream版本中所期待的是对KTable.

然而,我们注意到该Aggregation done for语句只在窗口期间打印一次,即使我们在给定窗口中针对同一个键输入了多条消息。请注意,我们没有使用suppress运算符仅发布最后一次更新。有人可以帮我理解为什么我们没有看到每个更新都发布到结果KStream吗?是否有任何最小时间间隔(或缓冲区大小)来缓冲更新然后发布?我们注意到的另一件事是,如果窗口更大(比如 5 分钟或更长时间),那么我们确实会看到中间更新正在发布。然而,这些并不是每一个更新。任何帮助/指针都非常感谢。

谢谢!

0 投票
0 回答
71 浏览

apache-kafka - 设计系统以使用 kafka 流进行每日/每月报告

我正在从事数据集成项目,我们需要使用 kafka 业务事件流,但要生成每日和每月报告。我们需要某种用于流的状态存储。到目前为止,我们头脑风暴的方法是:使用 ktable 存储事件并让(一对多)消费者查询数据以进行进一步的 ETL 处理或使用基于键值的(如 dynamoDB)转储事件并让消费者使用它。

我们当然不想拥有事件,并且存储应该在报告完成后消失。我有点担心每月处理存储的数据量,因为当我查看 kafka 主题一周的事件时,它们在 GB 的范围内。

我对这个问题空间相对较新,因此需要帮助来考虑效率和可扩展性。此外,对于未来的用例来说,这不会成为反模式。

0 投票
1 回答
214 浏览

apache-kafka - 可能由于与 ktable 相关的状态存储而导致内存不足异常

我们有一个kstreams应用程序进行kstream-kstable内部连接。这两个主题都是高容量的,每个主题有 256 个分区。kstreams应用程序现在部署在 8 个节点上,每个节点有 8 GB 堆。我们看到堆内存不断增长,最终发生OOM。我无法获得堆转储,因为它在容器中运行,当这种情况发生时会被杀死。但是,我尝试了一些方法来获得信心,它与状态存储/ktable 相关的东西有关。没有下面RocksDBConfigSetter的内存会很快用完,但是下面的内存会在一定程度上减慢。需要一些指导以进一步进行,谢谢

我添加了以下3个属性,

0 投票
1 回答
82 浏览

apache-kafka - KTable和KStream Space注意事项理解

我们正在使用 KSQLDB 执行 POC,但有一些疑问:-

我有一个名为 Kafka 的主题USERPROFILE,它有大约 1 亿条唯一记录和 10 天的保留策略。此 Kafka 主题继续从其底层 RDBMS 表实时接收 INSERT/UPDATE 类型的事件。

以下是此 kafka 主题中收到的记录的简单结构:-

1.)我们已经在上述主题上打开了一个 Kafka 流:-

2.)因为,给定的 userId 可以更新,我们只想要唯一的记录(对于每个 userId),我们还在上述主题上打开了另一个 Kafka 表:-

问题是: -

  • 打开 KTable 是否需要额外的磁盘空间?例如,Kafka 主题有 1 亿条记录,相同的记录是否也会出现在 KTable 中,或者它只是底层 kafka 主题的一些虚拟视图?

  • 对于我们打开的流,同样的问题。打开 KStream 是否需要磁盘(经纪人服务器的)额外空间?例如,Kafka 主题有 1 亿条记录,相同的记录是否也会出现在 KStream 中,或者它只是底层 kafka 主题的一些虚拟视图?

  • 比如说,我们在 5 月 1 日收到了 id 为 1001 的记录,然后在 5 月 11 日,该记录在 Kafka 主题上将不再可用,但是该记录是否仍会出现在 kstream / Ktable 上?KStream / KTable 是否有一些保留政策,就像我们对主题一样?

答案将不胜感激。

-- 最好的阿迪亚