问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
5192 浏览

apache-kafka-streams - 如何使用 Kafka Stream DSL 使用处理器过滤键和值

我有一个处理器,它与 StateStore 交互以过滤并对消息执行复杂的逻辑。在process(key,value)context.forward(key,value)用来发送我需要的键和值的方法中。出于调试目的,我还打印了这些。

我有一个 KStream mergedStream,它是由其他两个流的连接产生的。我想将处理器应用于该流的记录。我通过以下方式实现了这一目标:mergedStream.process(myprocessor,"stateStoreName")

当我启动这个程序时,我可以看到要打印到控制台的正确值。但是,如果我使用主题上的值将 mergeStream 发送到mergedStream.to("topic")主题,则不是我在处理器中转发的值,而是原始值。

我使用 kafka-streams 0.10.1.0。

将我在处理器中转发到另一个流的值的最佳方法是什么?

是否可以将处理器 APIKStream DSL创建的流混合?

0 投票
1 回答
811 浏览

apache-kafka-streams - StateStoreSupplier 在 KafkaStreams 中存储序列

我需要重新排序来自两个主题的数据(使用外连接合并)。StateStore使用 a保持最新序列并使用重新排序的消息修改下游流值是否是一种好习惯。

简化问题:

(来自主题 A 的序列,来自主题 B 的序列)-> 要输出的新序列(将当前序列保留在 中StateStore

(10,100) -> 1

(11,101) -> 2

(12,102) -> 3

(...,...) -> ...

新序列将存储为 stateStore 中键“currentSeq”的值。该序列将在每条消息上递增并存储回 stateStore。

0 投票
1 回答
294 浏览

apache-kafka - Kafka Streams 在 countByKey 之后没有写入预期结果

使用 Kafka Streams(版本 0.10.0.1)和 Kafka Broker(0.10.0.1)我试图根据消息键生成计数。我使用以下命令生成消息:

当我运行上面的命令时,我可以像这样发送一个键和值:

这将向 kafka 发送一条消息,其中 key = 1 且 value = {"value":10}。

我的目标是然后计算有多少消息具有 key=1。鉴于上述命令,计数将为 1。

这是我正在使用的代码:

当我运行 counts.print(stringSerde,longSerde) 我得到:

这意味着我有一个 key=1 并且它们是具有该密钥的 1 条消息。这就是我所期望的。

但是,当以下行运行时:

名为 message-counts-topic 的主题收到一条发送给它的消息,但是当我尝试使用此命令读取消息时:

我得到以下输出:

其中 1 是键,值不显示任何内容。我希望看到消息 1 , 1。但由于某种原因,计数值丢失了,即使它在调用 print 方法时显示。

0 投票
1 回答
343 浏览

apache-kafka - Kafka Streams - kafka-streams-application-reset.sh sending wrong API version

Kafka 0.10.0.1 adds the ability to reset a Kafka Streams app using a script called kafka-streams-application-reset.sh

Confluent has some good documentation on this script here.

Unfortunately, running the script locally results in an error:

Also, the following line appears in the broker logs:

From what I can tell, it looks like the client is making an invalid request, but I'm unsure why this happening. Our brokers are still on 0.9.0 so I'm not sure if that's the issue, but it appears to be a meta data request based on the apiKey value in the broker log.

Could someone please let me know why this might be happening and how it can be resolved?

0 投票
1 回答
2760 浏览

java - Kafka - How to use filter and filternot at the same time?

I have a Kafka stream that takes data from a topic, and needs to filter that information to two different topics.

However, when I do it like this, it reads the data from the topic twice -- not sure if that has any impact on performance as the data gets larger. Is there a way to just filter it once and push it to two topics?

0 投票
1 回答
823 浏览

java - 如何将 KStream 聚合到固定大小的列表?

与此问题类似但略有不同:KStream 批处理窗口,我想在将消息KStream推送给消费者之前对来自 a 的消息进行批处理。

但是,此下推不应安排在固定的时间窗口上,而应安排在每个键的固定消息计数阈值上。

首先想到两个问题:

1)这是AbstractProcessor应该处理的方式吗?类似于以下内容:

2)由于StateStore可能会爆炸(如果条目值永远不会达到阈值以便被转发),“垃圾收集”的最佳方法是什么?我可以制定一个基于时间的计划并删除太旧的密钥……但这看起来非常 DIY 并且容易出错。

0 投票
1 回答
1148 浏览

apache-kafka - Kafka Streams:在流中找到最小值的适当方法

我正在使用 Kafka Streams 版本 0.10.0.1,并试图在流中找到最小值。

传入的消息来自一个名为 kafka-streams-topic 的主题,并且有一个键,值是一个 JSON 有效负载,如下所示:

这是一个简单的有效负载,但我想找到这个 JSON 的最小值。

传出消息只是一个数字:2334

并且密钥也是消息的一部分。

因此,如果传入的主题得到:

名为 min-topic 的传出主题将得到

另一条消息传来:

因为这是同一个键,所以我现在想生成一条带有 key=1 value=100 的消息,因为它现在小于第一条消息

现在让我们说我们得到了:

将产生一条新消息,其中:

此外,如果我们收到消息:

不应产生任何消息,因为此消息大于当前值 100

这可行,但我想知道这是否符合 API 的意图:

这是实际运行处理器的代码:

0 投票
3 回答
5393 浏览

apache-kafka-streams - 使用查找数据丰富 KStream 的理想方法

我的流有一个名为“类别”的列,我在不同的商店中为每个“类别”提供了额外的静态元数据,它每隔几天更新一次。进行此查找的正确方法是什么?Kafka 流有两种选择

  1. 在 Kafka Streams 之外加载静态数据并仅用于KStreams#map()添加元数据。这是可能的,因为 Kafka Streams 只是一个库。

  2. 将元数据加载到 Kafka 主题,将其加载到 aKTable并执行KStreams#leftJoin(),这似乎更自然,并将分区等留给 Kafka Streams。但是,这要求我们保持KTable加载所有值。请注意,我们必须加载整个查找数据,而不仅仅是更改。

    • 例如,假设最初只有一个类别“c1”。Kafka 流应用程序已正常停止,然后再次重新启动。重新启动后,添加了一个新类别“c2”。我的假设是,table = KStreamBuilder().table('metadataTopic') 将只有值'c2',因为这是自应用程序第二次启动以来唯一发生变化的事情。我希望它有'c1'和'c2'。
    • 如果它也有'c1',是否会从 KTable 中删除数据(可能通过设置发送 key = null 消息?)?

以上哪一项是查找元数据的正确方法?

是否可以始终强制在重新启动时从头开始读取一个流,这样所有元数据都可以加载到KTable.

还有其他使用商店的方法吗?

0 投票
1 回答
10411 浏览

apache-kafka - Apache Samza 和 Apache Kafka Streams 的区别(关注并行和通信)

在 Samza 和 Kafka Streams 中,数据流处理在处理步骤(在 Samza 中称为“作业”,在 Kafka Streams 中称为“处理器”)的序列/图(在 Samza 中称为“数据流图”,在 Kafka Streams 中称为“拓扑”)中执行).在本问题的其余部分中,我将这两个术语称为工作流工作人员。

假设我们有一个非常简单的工作流程,由一个工人 A 使用传感器测量并过滤所有低于 50 的值,然后是一个工人 B 接收剩余的测量并过滤所有高于 80 的值。

输入(Kakfa 主题 X)-->(工人 A)-->(工人 B)--> 输出(Kafka 主题 Y)

如果我明白了

正确地,Samza 和 Kafka Streams 都使用主题分区概念来复制工作流/工作人员,从而为可扩展性目的并行处理。

但:

  • Samza 将每个工作人员(即作业)分别复制到多个任务(输入流中的每个分区一个)。也就是说,任务是工作流的工作人员的副本。

  • Kafka Streams 将整个工作流程(即拓扑)一次复制到多个任务(输入流中的每个分区一个)。也就是说,任务是整个工作流程的复制品。

这让我想到了我的问题:

  1. 假设只有一个分区:这是否正确,不能在 Kafka Streams 中的两台不同机器上部署工作程序(A)和(B),而这在 Samza 中是可能的?(或者换句话说:在Kafka Streams中是否不可能将单个任务(即拓扑副本)拆分到两台机器上,无论是否有多个分区。)

  2. Kafka Streams 拓扑中的两个后续处理器(在同一个任务中)如何通信?(我知道在 Samza 中,两个后续工作人员(即工作)之间的所有通信都是通过 Kafka 主题完成的,但是由于必须在代码中明确地在 Kafka Streams 中“标记”哪些流必须作为 Kafka 主题发布,所以这不能就是这里的情况。)

  3. Samza 还自动将所有中间流作为 Kafka 主题发布(从而使它们可供潜在客户使用),而 Kafka Streams 只发布那些明确标记的中间和最终流(addSink在低级 API 和to/或throughDSL 中),这是否正确? )?

(我知道 Samza 还可以使用 Kafka 以外的其他消息队列,但这与我的问题并不真正相关。)

0 投票
4 回答
6131 浏览

apache-spark - KStreams + Spark Streaming + 机器学习

我正在做一个 POC,用于在数据流上运行机器学习算法。
我最初的想法是获取数据,使用

Spark Streaming --> 从多个表中聚合数据 --> 在数据流上运行 MLLib --> 生成输出。

但我遇到了 KStreams。现在我很困惑!

问题:
1. Spark Streaming 和 Kafka Streaming 有什么区别?
2. 我怎样才能结合 KStreams + Spark Streaming + 机器学习?
3.我的想法是连续训练测试数据而不是批量训练。