问题标签 [kafka-join]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka 流以特定键作为输入加入
我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我要加入的键与我将数据写入每个主题的键不同。
假设我们有这 3 个 Avro 文件:
Alarm:
事件:
维护:
对于这些 Avro,我的 Kafka 中有 3 个主题(比如alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都会使用 ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这3个主题并获得一条新记录并将其写入一个新主题。问题是我想根据alarm_id和alarm_source_id加入警报和事件,并根据ne_id加入警报和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定了密钥吗?
apache-kafka-streams - Kafka Stream 使用 JoinWindow 进行数据重放
我有 2 个数据流,我希望能够在 1 个月的窗口内加入它们。当我有实时数据时,使用KStream和join一切都很有趣且超级简单。我做了这样的事情;
当我想进行数据重放时出现问题。假设我想为过去 6 个月的数据重新执行这些连接,因为我正在同时运行所有数据的管道 kafkaStream 将连接所有可连接的数据并且它不考虑时间差(其中它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
以及如何更改和操纵这个时间,以便我可以正确运行我的数据重播,我的意思是重新插入过去 6 个月的数据,每条记录需要一个月的时间窗口并基于该记录加入。
这个问题与How to manage Kafka KStream to Kstream windowed join不重复?,在那里我问我如何才能根据时间窗口加入。这里我说的是数据重放。根据我在加入 Kafka 期间的理解,将数据插入主题的时间作为 JoinWindow 的时间,所以如果你想进行数据重放并重新插入 6 个月前的数据,kafka 将其作为新数据今天插入,并将与一些其他数据加入它,这些数据实际上是今天不应该的。
apache-kafka-streams - Kafka Streams API:我加入了两个 empmodel 的 KStreams
我得到错误:
无效的拓扑构建:KSTREAM-MAP-0000000003 和 KSTREAM-MAP-0000000004 不可连接
apache-kafka - 嵌入式 Kafka:KTable+KTable leftJoin 产生重复记录
我来寻求奥术知识。
首先,我有两对主题,每对中的一个主题馈入另一个主题。后面的主题正在形成两个KTable,用于KTable+KTable leftJoin。问题是,当我为任一 KTable 生成一条记录时,leftJoin 生成了三条记录。我希望表单中有两条记录(A-null,AB),但我得到的是(A-null,AB,A-null)。我已经确认 KTables 每个都收到一条记录。
我摆弄了 CACHE_MAX_BYTES_BUFFERING_CONFIG 来启用/禁用状态存储缓存。上述行为是将 CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 0。当我使用 CACHE_MAX_BYTES_BUFFERING_CONFIG 的默认值时,我看到连接输出以下记录:(AB, AB, A-null)
以下是流、消费者、生产者的配置:
遇到此行为的处理器 API 代码(已清理)如下,请注意配对的主题 [A1, A2] 和 [B1, B2]:
预先感谢您的任何帮助,哦,仁慈的人。
更新: 我正在使用一个 kafka 服务器和每个主题 1 个分区运行,并且遇到了这种行为。当我将服务器数量增加到 2 并将分区数量增加到 3 时,我的输出变为(A-null)。
在我看来,我需要花更多时间阅读 kafka 手册......
java - KStream 到 KTable 左连接返回 Null
我目前正在尝试使用 KStream 到 KTable 连接来丰富 Kafka 主题。对于我的概念证明,我目前有一个 Kafka 流,其中包含大约 600,000 条记录,它们都具有相同的键,并且从一个主题创建的 KTable 具有 1 条记录的键、值对,其中 KTable 主题中的键与 600,000 的键匹配创建 KStream 的主题中的记录。
当我使用左连接(通过下面的代码)时,所有记录在 ValueJoiner 上都返回 NULL。
这是来自源 KStream 的示例输出记录(使用 forEach 循环):
我尝试将 KTable 转换回 KStream 并在转换后的 Stream 上使用 forEach 循环,并验证记录实际上存在于 KTable 中。
上面的代码输出:
java - Kafka Streams 键加入,条件复杂
我正在尝试按键加入KStream
,GlobalKTable
但有特定的逻辑。
例如,如果 key = "ABC",那么:
- 首先,按完整键加入 - 即“ABC”=“ABC”
- 然后,如果未加入,则通过前两个符号加入(删除一个符号) - 即“AB”=“AB”
- 最后,尝试仅通过一个符号加入 - 即“A”=“A”
此外,还需要知道执行连接的条件 - 例如,按 3 个字母/按 2 个字母/按 1 个字母。
问题是,有可能还是我应该寻找解决方法?例如,使用相应的键(带有“ABC”键的表,一个带有“AB”键和一个带有“A”键的表)复制 GlobalKTable 并执行 3 个单独的连接?或者也许还有其他建议?
提前致谢!
performance - Kafka KStream 到 KStream 加入 | 重启性能
我计划在很长一段时间内(约 1 周)将两个主题作为 KStreams 加入。假设这个窗口会累积上亿条记录,加入的消费者需要多长时间才能重启?我之所以问这个问题,是因为我无法找到有关该窗口中有多少记录存储在消费者缓存中的信息。
apache-kafka - Kafka - 当数据在不同时间出现时从两个不同的流中加入数据
我有一个场景,我们有两个不同的流,我们在两个不同的时间获取它们的数据,我需要根据值中的时间戳加入它们。我将尝试通过以下示例进行解释。
输入流1->
- 键 111,价值 21:00 AAA
- 键 111,值 21:02 AAA
- 键 111,值 21:04 AAA
- 键 111,值 21:15 AAA
- 键 111,值 21:18 BBB
- 键 111,值 21:20 BBB
输入流2->
- 键 111,值 21:01 10.0.0.1
- 键 111,值 21:04 10.0.0.2
- 键 111,值 21:14 10.0.0.3
- 键 111,值 21:20 10.0.0.4
- 键 111,值 21:21 10.0.0.5
输出加入我需要的 ->
- AAA 10.0.0.1
- AAA 10.0.0.2
- AAA 10.0.0.3
- BBB 10.0.0.4
- BBB 10.0.0.5
注意:两个流在不同时间获取输入。当 inputStream1 的第一条记录到达时,inputStream2 可能有所有 5 条记录。我想在值中的时间窗口上匹配它们。
我怎样才能在kafka中实现这一点。甚至可能吗?
apache-kafka - 如何使用kafka流连接操作处理一对多关系
你能帮我如何使用Kafka流来实现这一点吗?
场景:对订单数据的所有发票进行分组。在实时流媒体中,接收发票可能会有延迟。所以我们想在加入之前等待 20 分钟对所有发票进行分组。
示例:订单“x”有 3 张发票,预计将在 20 分钟内收到。
预期输出:订单和 3 张发票应作为输出主题中的单个数据提供。
我们有以下拓扑来实现这一点。
我们分别有订单流和发票流
我们根据订单键对发票进行分组。我们设置了 20 分钟翻滚窗口
将订单数据与生成的发票组连接起来
将输出写入新主题
问题:步骤 3 不等待步骤 2 完成。收到订单后立即加入操作。所以我们没有得到预期的输出。
我们尝试使用连接窗口来实现相同的目的。但由于连接窗口是滑动窗口,我们在输出主题中得到重复数据。
对于上面的例子,如果我们使用连接窗口而不是翻转窗口,我们将得到 3 个输出数据,订单分别有 1 个发票、2 个发票和 3 个发票。
请帮助我解决此问题或建议任何替代方法
代码片段: