问题标签 [ktable]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java-8 - 如何将 KStream 中的记录对减少到 1 到 KTable
这是输入的 Kafka 主题,其中包含ConnectionEvent
:
ConnectionEvent("John", "123", "CONNECTED")
ConnectionEvent("John", "123", "DISCONNECTED")
ConnectionEvent("Anna", "222", "CONNECTED")
ConnectionEvent("Rohan", "334", "CONNECTED")
ConnectionEvent("Anna", "199", "CONNECTED")
ConnectionEvent("Anna", "255", "CONNECTED")
ConnectionEvent("Anna", "255", "DISCONNECTED")
ConnectionEvent("Anna", "222", "DISCONNECTED")
流和缩减逻辑
主题中的每个项目都使用消息键作为用户 ID发送。例如,“安娜”。
流必须按以下方式处理:
- John 只有 1 个会话 123 连接和断开连接。所以他下线了
- Rohan 只有 1 个未断开的会话 334。所以他上线了
- Anna 有 3 个会话(222、199、255),其中 2 个会话断开连接。所以她上线了
KTable 必须有以下数据:
John Offline
Rohan Online
Anna Online
我尝试的是这样的:
但是现在,我将如何将复合键(用户,sessionId)取消分组为仅用户,然后根据最新状态为 CONNECTED 的 sessionId 的数量将用户标记为在线/离线?
apache-kafka - KStream - KTable Join 未触发
我使用 Streams DSL 加入了 2 个主题(实际上更多,但在这里保持简单),一旦加入,就会将数据发布到下游。
我正在主题 1 之上创建一个 KTable 并将其存储到一个命名的状态存储中。Topic1 的键如下所示:
我按预期看到了变更日志主题中的数据。
在主题 2 之上有一个 KStream。主题 2 的密钥如下所示:
我正在重新键入和聚合来自主题 2 的数据并将其放入另一个命名状态存储中,因为 topic1 和 topic2 中的数据之间存在 1-Many 关系。重新键入数据后,主题 2 中的键看起来与主题 1 的键相同。我可以看到重新分区主题中重新键入的数据以及变更日志主题中的聚合数据。但是,连接不会被触发。
其他关键细节——</p>
- 所有主题中的数据都是 Avro 格式。
- 我正在使用 Java/Spring Boot。
- 我在commit.interval.ms和cache.max.bytes.buffering上保留了默认设置
任何指向我在这里可能做错的事情?
编辑 1:我查看了数据分区,看起来一个在 14 上,另一个在 20 上。我还发现了一个类似的问题。
编辑 2:topic1 和 topic2 的生产者是一个 golang 应用程序。流恢复消费者具有以下配置:
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
流消费者具有以下配置:
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
apache-kafka - Kafka 日志压缩频繁更新的密钥永远不会被消耗
基于Kafka文档https://kafka.apache.org/documentation/#compaction
基于offset的client消费顺序:
compaction前为K1,K2,K3,K4,K5,K6,
compaction后为K1,K3, K4,K5,K2,K6
我说得对吗(如果我们有相对较小的压缩间隔)如果我们有 K2 继续进入主题(与其他键一起,比如不重复,K100,K101,K102 等),它会一直被推回,如果我们有一个消费者比生产者慢,那么消费者在赶上生产者的速度之前不会消费K2?
如果上面的说法是真的,我们有没有办法配置这个行为,比如消费的顺序和压缩前一样,但是 K2 的值是最新的。
apache-kafka-streams - 在创建它的同一个应用程序中查询 KTable
我有一个 Kafka 流应用程序,我在其中读取一个主题,进行聚合并在 KTable 中实现。然后我创建一个 Stream 并在流上运行一些逻辑。现在在流处理中,我想使用前面提到的 KTable 中的一些数据。启动流应用程序后,如何再次访问 KTable 流?我不想将 KTable 推送到新主题。
在这里,我想使用 kTable 中的数据。
apache-kafka - Kafka 拓扑设计:如何在超时时加入滑动窗口并发出事件?[难的]
我有一组要求如下:
- 消息'T'到达,必须等待5秒'A'中的相应消息到达(使用相同的密钥)。如果它在 5 秒内到达,则发送连接值并发送到下游。如果它没有在 5 秒内到达,则仅向下游发送“T”消息。
- 消息 'A' 到达,必须等待 5 秒才能使 'T' 中的相应消息到达(使用相同的密钥)。如果它在 5 秒内到达,则发送连接值并发送到下游。如果它没有在 5 秒内到达,则仅向下游发送“A”消息。
我目前的想法是做一个 KStream-KStream Sliding Window OUTER join。但是,在向下游发送 (T, null) 或 (null, T) 消息之前不会等待 5 秒(即立即完成)。
我需要等待超时发生,如果没有发生加入,则发送未加入的消息。
我附上了一张图表来帮助理解这些案例。我正在尝试尽可能多地使用 DSL。
任何帮助表示赞赏。
java - 如何使用标点符号从状态存储中删除旧记录?(卡夫卡)
我Ktable
使用 为主题创建了一个streamsBuilder.table("myTopic")
,我将其具体化到一个状态存储,以便我可以使用交互式查询。
每小时,我想从这个状态存储(以及相关的变更日志主题)中删除其值在过去一小时内没有更新的记录。
我相信这可能使用punctuator,但到目前为止我只使用了 DSL,所以不确定如何进行。如果有人能给我提供一个例子,我将不胜感激。
谢谢,
杰克
apache-kafka - 删除重复项(将键视为航班号)并仅获取最新记录 wrt 时间戳
我可以插入数据,这是 Kafka 中的 avro 模式吗?
我想从主题中选择记录,然后过滤航班(例如:考虑两个记录具有相同的航班号。我们只需要通过考虑 Avro 模式中提到的时间戳来选择最新的一个
我该怎么做我想删除相同航班号的重复项
输出流应该是这样的,
阿夫罗:
apache-kafka-streams - 知道 Kafka 事件在 K-Table 中可见的最有效方法是什么?
我们使用 Kafka 主题作为事件和存储库。使用 kafka-streams API,我们定义了一个简单的 K-Table 来表示主题中的所有事件。
在我们的用例中,我们将事件发布到主题,然后引用 K-Table 作为后备存储库。主要问题是发布的事件不会立即在 K-Table 上可见。
我们尝试了此处描述的事务和恰好一次语义(https://kafka.apache.org/26/documentation/streams/core-concepts#streams_processing_guarantee),但总是存在我们无法控制的延迟。
- 发布事件
- 时间不定
- 已发布事件在 K-Table 中可见
有没有办法消除延迟或以其他方式知道 K-Table 已使用特定事件。
注意:我们尝试了分区表和全局表,结果相似。
谢谢
apache-kafka - 在 KTable 的插入/更新时重新处理丰富的 kstream 数据
这是我尝试使用 Kafka Streams 实现的假设但类似的场景。
我有流数据、销售数据,我想通过不经常更改的查找数据来丰富这些数据,比如用户和项目,我计划为其创建 KTable。我计划将这些丰富的数据推送到主题并使用连接接收器推送到搜索引擎。
如何确保用户/项目数据的更新也触发对过去销售数据的丰富,而不仅仅是在流中摄取的新数据。据我了解,KTable 插入/更新不会触发对流的过去数据的任何重新处理。
我相信这可能是一个常见的用例,至少我可能不是第一个有这种需求的人。有关解决方案或解决方法的任何指导吗?
scala - 如何在 scala 中为 kafka 打印全局 Ktable?
我正在尝试了解 GlobalKtable 在 kafka 中的工作方式,为此我正在尝试编写示例代码。我已经创建了 globalKtable 但我也想看看我已经尝试过 peek 功能但它不可用,现在我正在尝试通过视图但它给出了编译时错误。
在scala中查看globalktable的写法是什么?
我试过的是