问题标签 [kafka-join]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
106 浏览

java - 当 KTable 有更新时,KTable 和 KStream join 是否会发布新记录?

我在我的应用程序中实现了一个 KTable 和一个 KStream 连接,并在以下情况下期望输出消息;

  1. KStream 中有一条新消息,并且 KTable 中有一条匹配记录
  2. KStream 中有一条更新的消息,并且 KTable 中有一条匹配的记录
  3. 当 KTable 记录有更新时

我一直在观察我的应用程序按预期执行 1 和 2,但不是 3。

有什么建议可以帮助我实现第 3 点吗?

谢谢!

0 投票
0 回答
49 浏览

apache-kafka-streams - 卡夫卡流自我加入更新流时间

我有每分钟聚合的站点状态(连接/断开连接)的流,其中聚合被抑制,并且对于每个分钟键,我们只有一个记录。现在,当我们的状态从一分钟变为另一分钟时,我想写入新的“警报”主题消息。

例如 17/03/21 12:00 - 已连接,17/09/21 12:01 - 17/09/21 断开连接,我们要在 12:01 写入断开连接警报。

我尝试做的是获取stream1,并将其重用于时间延长1分钟的stream2。然后与窗口 0 进行连接,以便将每个聚合与前一分钟的结果连接起来,并过滤具有不同状态的聚合

例如 stream1 17/03/21 12:00 - 已连接,17/09/21 12:01 - 已断开 stream2 将是 17/03/21 12:01 - 已连接,17/09/21 12:02 - 已断开零窗口将仅加入 17/09/21 12:01 状态不同,因此记录将保留在流中,我会将其写入警报主题。

问题是加入不是我所期望的记录,从 stream1 到 12:01 从 stream2 用了 12:00,从 12:02 用了 12:01(没有宽限的零窗口)。好像我对时间所做的更改并没有影响,并且加入仍然认为12:01是12:00。我错过了什么?

0 投票
0 回答
70 浏览

apache-kafka - Kafka-在复合键上加入 KStream 和 KTable

我有两个主题-plansupplier

plantopic 有一个基于两列planCode+的复合键 (avro) memberAge
suppliertopic 以 column 为键supplierId,它包含 columnplanCode但不包含memberAge.

SUPPLIER (KStream) : supplierId -> ...., planCode ,..
PLAN (KTable/ GlobalKTable) : {planCode, memberAge} -> value

我想做一个供应商的左连接计划plan_code。我怎样才能做到这一点?

0 投票
0 回答
25 浏览

apache-kafka - KTable-Ktable Join 导致空指针异常

加入两个 KTable 时,我面临空指针异常。

当主题 A 中存在记录但主题 B 中没有匹配记录时,会出现空指针。

使用跟踪日志,我可以看到以下日志打印了一个 null

这可能表明连接没有被触发,但它仍然会随着拓扑的执行而继续。因此,在加入之后,它开始执行“selectKey”节点并运行到 NPE。

据我了解,如果未触发联接,则应在拓扑中等待。

请任何人帮助了解可能出现的问题

0 投票
1 回答
30 浏览

scala - Kstream-Kstream join based on common field

We want to do Kstream-Kstream join based on the common Field(primary key). Currently with the below code we are getting result as just merging 2 Streams without any primary key constraint.

Could you please suggest how to join 2 Streams based on common field/Column.

0 投票
0 回答
18 浏览

java - 在 groupby 键和减少 java 之后,Kafka 加入未显示 2 个流的结果

我有来自 2 个主题的 2 个流。假设流 1 和流 2。

我正在做

这是高级别的代码。没有为此逻辑生成输出。我在这里做错了什么?有人可以请指导。我无法找到我们在这里绞尽脑汁的内容。主题也是共同分区的,就像两个主题都有 4 个分区一样。

0 投票
0 回答
37 浏览

apache-kafka - Kafka 流加入:如何在发出记录之前等待一段时间?

我们目前有 2 个 Kafka 流主题,这些主题有连续的记录。我们正在考虑在等待 5 分钟的窗口后基于键加入 2 个流,但使用我当前的代码,我看到记录立即发出,而无需“等待”以查看匹配记录是否到达另一个流。我目前的实现:

虽然我有JoinWindows.of(Duration.ofMinutes(5)),但我看到一些记录立即发出。我如何确保它们不是?

此外,这是加入 2 个 Kafka 流的最有效方式,还是提出我们自己的从 2 个流读取的消费者实现等更好?