问题标签 [ktable]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
35 浏览

apache-kafka - 如何在不耗尽内存/磁盘空间的情况下将大量主题作为 KTABLE 使用?

我们有一个 kstreams 应用程序做 kstream-kstable 内连接。这两个主题都是高容量的,每个主题有 256 个分区。kstreams 应用程序现在部署在 8 个节点上,每个节点有 8 GB 堆。状态存储(rocksdb)保存在磁盘上,容器上的磁盘空间不足。有哪些选项可以将来自某个主题的数据用作 KTABLE,但限制磁盘上的数据量(例如,如果我们只想保存一天的键/数据或某个时间范围)并具有先前的状态/文件被删除?

0 投票
0 回答
75 浏览

java - 从 KTable 中删除数据

我正在使用 Kafka Streams 进行一些基于键的聚合。聚合数据会占用 KTable 中的内存。我进一步计划将聚合保存到某个数据库。由于 KTable 的大小会迅速增加,一段时间后从 KTable 中删除条目的最佳方法是什么?

0 投票
1 回答
113 浏览

apache-kafka - 当具有相同密钥的多条消息同时到达时,Kafka Ktable 更改日志(使用 toStream())缺少一些 ktable 更新

我有一个输入流,我用它来创建一个 ktable。然后我使用 toStream() 方法使用 ktable 更改日志创建一个输出流。问题是 toStream() 方法创建的流不包含来自已更新我的 KTable 的输入流的所有消息。这是我的代码:

我想为 inputStream 中的每条消息在 outputStream 中获取一条消息。对于大多数消息,它运行良好,但在特定情况下我会丢失一些事件:如果我在很短的时间间隔内(少于 5 秒)收到 2 条具有相同密钥的消息。在这种情况下,我只收到 outputStream 中的第二个事件。

我认为这是因为 Ktable 更新是通过一些批处理操作进行的,但我找不到任何与之相关的配置或文档。是这些丢失事件的原因吗?您知道如何更改配置以便我不会丢失任何消息吗?

0 投票
1 回答
69 浏览

apache-kafka - 使用 Kafka 实现 SQL 更新

如何更新存储在 Kafka 主题/Ktable 中的对象?

我的意思是,如果我不需要替换整个值(压缩的 Ktable 会这样做),而是单个字段更新。我应该从主题/Ktable 中读取、反序列化、更新对象,然后将新值存储在同一个主题/KTable 中吗?

或者我应该加入/合并 2 个主题:一个具有原始值,第二个具有字段更新?

你会怎么做?

0 投票
0 回答
14 浏览

apache-kafka - 我可以在kafka的v中有自定义类吗

我是大数据的新手。我有些怀疑。如果我能弄明白,那将非常有帮助。我需要对窗口流进行聚合。我有一个 kafka 主题,其中包含嵌套 json 中的数据。我需要在ktable中转换json并聚合6小时、12小时和24小时。我有一些疑问

  1. 如何展平多嵌套的 json。我需要采用表格格式
  2. 如何以 k,v 格式的 kafka 从 v 访问每个元素。
  3. 如何将 v 中的一列作为新键。
  4. 我应该如何聚合。每 6 小时、12 小时和 24 小时的 Ktable,或者我应该有 1 个 ktable 并且可以在查询时聚合。
0 投票
1 回答
76 浏览

apache-kafka - Kafka 流:如何在聚合时生成主题?

我目前有一些使用聚合构建 KTable 的代码:

一旦为单个键接收并聚合了给定数量的消息,我想将最新的聚合状态推送到另一个主题,然后删除表中的键。

我显然可以使用一个普通的 Kafka 生产者,并且有类似的东西:

但我正在寻找一种更“流”的方法。

有什么提示吗?

0 投票
1 回答
128 浏览

java - 打印 Kafka KTable 数据

我有这个方法,它输入一个 JSON 有效负载。我想用这个有效载荷创建一个 K 表,从中读取数据并打印出来。

到目前为止,我要创建 KTable,但是当我对其进行迭代时,控件会跳过它。

请任何人都可以帮助我/指导我哪里出错或我错过了什么?

谢谢。

0 投票
2 回答
28 浏览

java - KTable 数据 - 字节打印

我在 Kafka 中有一个 KTable,当我调试时,我以字节为单位获取数据。

如果我想要它在字符串中,我该怎么办?我也附上了一个片段。

0 投票
0 回答
191 浏览

apache-kafka - 将新消息放入输入流时,KTable 不会(立即)更新

我有一个带有任意键的字符串的 kafka 主题。我想创建一个characters in string : value配对主题,例如:

input("key","value") -> outputs (["v","value"],["a","value"],...)

为简单起见,我的输入主题有一个分区,因此 KTable 代码应该将所有消息接收到单个实例。

我创建了以下沙箱代码,它可以很好地构建新表,但在将新项目放入原始主题时不会更新:

我得到的输出如下:

存储 sandbox_store 创建成功。
-------------打印开始----
u:uvw
v:uvw
w:uvw
---------- ---打印结束----
-------------添加新值------------- ---
-------------添加新值----
-------------打印开始----------------
u:uvw
v:uvw
w:uvw
-------------打印结束---------- ------

请注意,我添加的 xyz 不见了!

(ps我知道我可以使用reduce代替aggregate,但实际上新值将是不同的类型,而不是字符串,因此它不适用于我的实际用例)

现在,如果我在第二次打印之前添加 10 秒的暂停;或者,如果我在不清除主题的情况下重新启动 Sandbox 类,则会出现第一个 xyz。所以很明显,系统中的某个地方存在时间延迟。在实践中,我正在处理 300mb+ 的消息,所有消息都进入输入主题,每小时一次;所以延迟甚至比几秒钟还要长。

我怎样才能帮助加快速度?

0 投票
1 回答
141 浏览

apache-kafka-streams - 使用 Faust 的滑动窗口

有谁知道如何使用 Faust 实现滑动窗口?

这个想法是计算一个键在 10、30、60 和 300 秒窗口中的出现次数,但我们需要在 1 秒或每次更新的基础上进行计数。

我有一个狡猾的解决方法,这似乎非常低效,我有一个翻滚的 1 秒窗口,到期时间为 300 秒,然后我使用该delta()方法将表中的所有旧值与当前值相加。它似乎可以处理来自 6 个源的消息,每个源以 10 条消息/秒的速度运行,但这大约是我们看到滞后之前的限制。这显然是一种无法扩展的缓慢方法,所以问题是如何在不需要 KSQL 或设置 Spark 集群以及 Kafka 集群的情况下实现这一点。如果可以,我们会尽量保持简单。

更复杂的是,我们非常希望在过去 24 小时、1 周、1 个月和过去 3 个月内拥有相同的统计数据……所有这些都在运行中。但也许我们只是要求太多,而没有为每个输入提供专门的流程。

这是我的狡猾代码: