问题标签 [ktable]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1786 浏览

apache-kafka - 使用 KSQL 查询 Kafka Streams KTable

我正在编写这个 Kafka 流应用程序,它获取在 Kafka 主题中注册的传感器读数(作为 中的消息JSON),并以每分钟、每小时和每天对这些读数的值执行一些聚合基础。然后我KTables从这些聚合中实现派生并使用默认状态存储来存储它们。我想知道是否可以使用KSQL.

0 投票
1 回答
352 浏览

apache-kafka - kafka 流 groupBy 聚合产生意外的值

我的问题是关于 Kafka 流 Ktable.groupBy.aggregate。以及由此产生的聚合值。

情况

我正在尝试每天汇总分钟事件。

我有一个分钟事件生成器(此处未显示),可为一些房屋生成事件。有时事件值错误,必须重新发布分钟事件。 分钟事件发布在“分钟”主题中

我正在使用kafka StreamsgroupByaggregate.

问题

通常,由于一天有 1440 分钟,因此不应有超过 1440 个值的聚合。此外,永远不应该有一个包含负数事件的聚合。

...但无论如何它都会发生,我们不明白我们的代码有什么问题。

示例代码

这是一个示例简化代码来说明问题。有时会抛出 IllegalStateException。

以下是此代码片段中使用的示例类:

如果有人能告诉我们我们在这里做错了什么以及为什么我们有这些意想不到的价值,那就太好了。

补充说明

  • 我们将流作业配置为使用 4 个线程运行properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
  • 我们被迫使用 a ,因为对于已发布的 Instant,可以使用不同的Ktable.groupBy().aggregate()分钟值 重新发布。sensorValue并且每日聚合相应地修改。 Stream.groupBy().aggregate()没有adderAND substractor
0 投票
1 回答
80 浏览

ksqldb - KSQL 每组选择一行对应于具有最小时间戳的行

在 KSQL 中有一种 row_number 类型的函数,它可以与 TUMBLING WINDOW 结合使用,以便进行分组,并且仅获取与所使用的组中具有最小时间戳相对应的事件。

0 投票
0 回答
217 浏览

apache-kafka - KTable-KStream LeftJoin 影响性能。是否有任何警报

我有一个用例,我在其中接收有关某个主题的推文,以及有关其他主题的用户详细信息。我需要从用户详细信息中找到用户名并将其设置为推文。使用以下代码,我可以获得预期的结果。

但是,如果 kTable 主题中有 1000 条记录,则处理 100 万条此逻辑需要 2 小时以上。之前需要 2 到 3 分钟。

早些时候,当用户详细信息在本地哈希映射中时,处理所有数据大约需要 10 分钟。有没有其他方法可以避免 LeftJoin 或提高其性能?

0 投票
1 回答
1635 浏览

json - Kafka Streams API GroupBy 行为

我是 kafka 流的新手,我正在尝试使用 groupBy 函数将一些流数据聚合到 KTable 中。问题如下:

生成的消息是一个 json msg,格式如下:

我想隔离 json 字段"after",然后用"key" = "ID"创建一个 KTable并为整个 json "after"赋值

首先,我创建了一个 KStream 来隔离“after” json,它工作正常。

KStream 代码块:(不要注意if 语句,因为“before”和“after”格式相同。)

正如预期的那样,输出如下:

之后,我实现了一个 KTable 到 groupBy json的“ID”。

KTable 代码块:

我想创建一个错误,KTable<String, String>但我正在创建GroupedStream<Object,String>.

总之,问题是 KGroupedStreams 到底是什么以及如何正确实现 KTable?

0 投票
0 回答
1036 浏览

java - Kafka Streams:无法刷新由 java.lang.ClassCastException 引起的状态存储:无法将键值转换为大小写

我们使用 Kafka 流已经有一段时间了,但从来没有编写测试来覆盖我们的拓扑。我们决定试一试并使用流库提供的拓扑测试驱动程序。不幸的是,我们遇到了无法解决的问题。这是我们生产代码的虚拟版本,具有相同的语义。

它连接了包含 2 种类型文档的 2 个主题。我们的目标是将文档汇总到每个人的“文件夹”中,其中使用来自不同文档的信息。在运行测试时,我们遇到了一个异常,这是由从 PersonKey 到 DocumentA 的错误转换引起的。在下面,您可以看到测试设置、数据结构的模式和异常的堆栈跟踪。

文件A

文件B

文件夹

个人密钥

例外

Kafka v2.3.0 Avro v1.9.1' KafkaAvroSerde v5.2.1'

更新

我尝试使用处理器 API 重写拓扑,但没有任何运气。之后我尝试使用真正的模式注册表并且测试通过了,所以看起来问题出在 MockSchemaRegistry 上。当我找到原因时将发布另一个更新。

更新 2

我设法让它与模拟模式注册表一起工作,但我必须手动注册所有模式,包括状态存储和内部状态存储更改日志主题的模式

0 投票
1 回答
271 浏览

java - Apache Kafka - 实现 KTable

我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,是一个json格式的消息,如下图。

我阅读了来自这个主题的消息,我想创建一个KTable,它具有作为key、 field"after":"ID"和 for value字段内的所有"after"字段(除了"ID")。

仅当我使用默认聚合函数(即计数)时,我才成功创建了 KTable。但是我很难创建自己的聚合函数。下面我介绍我尝试创建 KTable 的部分代码。

我怎样才能实现这个 KTable?

我是否正确地解决了这个问题?

(mapValues -> 只保留“之前”/“之后”字段。groupBy -> 使 ID 成为消息的键。聚合 -> ?)

0 投票
1 回答
338 浏览

apache-kafka - Apache Kafka - KStream 和 KTable 硬盘空间要求

我试图更好地了解创建 KStream 和 KTable 时资源级别会发生什么。下面,我将提到一些我得出的结论,据我所知(请随时纠正我)。

首先,每个主题都有多个分区,这些分区中的所有消息都以连续的顺序存储在硬盘中。

KStream 不需要将从主题读取的消息再次存储到另一个位置,因为偏移量足以从连接到的主题中检索这些消息。(这个对吗? )

问题与KTable有关。据我了解,与 KStream 相比,KTable 使用相同的密钥更新每条消息。为此,您必须在外部将来自主题的消息存储到静态表中,或者在每次新消息到达时读取所有消息队列。后者在时间性能方面似乎不是很有效。我提出的第一种方法是否正确?

0 投票
1 回答
1898 浏览

spring - 在 Kafka Streams 中创建全局状态存储(春季)

我是 Kafka 新手,并尝试创建一个小型 Kafka KTable 实现。我已经成功添加了一个 KTable 并且能够查询。我使用了当地的州立商店,它按预期工作。以下是我的本地状态商店配置

现在我想使用 RPC 来使用 Global State。我对几个问题感到困惑。要添加全局状态存储,我需要添加 RPC 端点

文件说

“唯一的要求是 RPC 层嵌入到 Kafka Streams 应用程序中”

  • 这是否意味着我们需要在 Kafka 应用程序中创建一个客户端端点,如果是这样,如果它是一个具有 Web 依赖关系的 Spring Boot 应用程序,它就像“localhost:8080”
  • 此应用程序的其他实例将如何仅通过 APPLICATION_SERVER_CONFIG (application.server) 连接并执行交互式查询或保持状态同步。我的意思是如何为同一应用程序的其他实例提供额外的配置以在全局状态下创建同步。
  • 如果创建了全局状态,无论出于何种原因,我们是否需要在 Mongodb 或其他地方保留备份。(容错)考虑到数据库永远不会像写入磁盘一样快,我们是否关心它还是应该依赖分布式架构

如果给出一些带有示例的 Kafka Global State Store 实现,那就太好了。

0 投票
1 回答
591 浏览

apache-kafka - Kafka Streams K-Table 大小监控

我有一个流拓扑,它从一个主题中消费并运行一个聚合并构建一个 KTable,该 KTable 被具体化到 RocksDB 中。

我有另一个应用程序,它每天使用来自同一主题的所有事件,并为满足某些特定条件(即不再需要它们)的事件发送墓碑消息。聚合处理此问题并从状态存储中删除,但我正在查看监控状态存储的大小或更改日志主题 - 任何真正告诉我 ktable 大小的东西。

我已经公开了 JMX 指标,但那里似乎没有任何东西可以满足我的需求。我可以看到“放入”rocksDB 的总数,但看不到键的总数。我的应用程序是 spring boot,我想通过 prometheus 公开指标。

有没有人解决了这个问题或有任何帮助的想法?