问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
8702 浏览

apache-kafka - 消费者组的 Kafka 流偏移量重置为零

我编写了 Kafka Streaming 应用程序,它只根据某些条件过滤行并将其加载到 MongoDB。

流式处理工作正常,但由于我的代码存在一些缺陷,我想再次重新处理整个数据。

一种方法是杀死流媒体应用程序,更改消费者组 id,从 mongo 中删除数据并重新运行应用程序。

如何在不更改消费者组 ID 的情况下实现此场景。

<<我使用的是Kafka 0.10版本>>

非常感谢帕里

0 投票
2 回答
1547 浏览

apache-kafka - KStreamBuilder 无法将数据从 1 个主题流式传输到另一个主题

我试图使用 KStreamBuilder 将数据从一个主题移动到另一个主题。我尝试了以下代码,但有异常

输出 :

然后我尝试使用数据。

任何想法?我需要任何额外的配置吗?我正在使用 kafka 0.10.0.0 集群和客户端。

使用的依赖项。

0 投票
1 回答
484 浏览

java - 无法让加入的 Kafka 流运行或输出任何内容

对于下面的代码,stream1 和 stream2 都单独运行良好,我可以看到输出,但连接的流根本不记录任何内容。我感觉它与连接窗口有关,但是来自两个流的数据几乎同时进入。

0 投票
3 回答
21431 浏览

apache-kafka - 如何发送时间窗 KTable 的最终 kafka-streams 聚合结果?

我想做的是:

  1. 使用数字主题(Long's)中的记录
  2. 聚合(计数)每个 5 秒窗口的值
  3. 将 FINAL 聚合结果发送到另一个主题

我的代码如下所示:

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?

0 投票
1 回答
1541 浏览

apache-kafka-streams - Apache Kafka Streams 中特定分区上的聚合

假设我有一个名为 Kafka 的主题SensorData,两个传感器 S1 和 S2 将数据(时间戳和值)发送到两个不同的分区,例如 S1 -> P1 和 S2 -> P2。现在我需要分别汇总这两个传感器的值,假设计算 1 小时时间窗口内的平均传感器值并将其写入新主题SensorData1Hour。有了这个场景

  1. 如何使用该KStreamBuilder#stream方法选择特定主题分区?
  2. 是否可以在来自同一主题的两个(多个)不同分区上应用一些聚合函数?
0 投票
1 回答
93 浏览

java - kafkastreams - 增加更多处理能力

我正在研究将现有Flink应用程序/拓扑转换为使用KafkaStreams的 POC 。我的问题是关于部署的。

具体来说——在 Flink 中,将“工作节点”添加到 flink 安装中,然后在拓扑中添加更多并行化以跟上不断增长的数据速率。

随着数据速率的增加,如何增加 KStreams 的容量?KStreams 会自动处理这个吗?我是否启动更多进程(ala 微服务)?

还是我在这里错过了大局?

0 投票
2 回答
3676 浏览

java - 停止 Kafka Streams 应用程序

是否有可能拥有一个运行主题中所有数据然后退出的 Kafka Streams 应用程序?

示例我正在根据日期将数据生成到主题中。消费者被 cron 启动,遍历所有可用数据,然后......做什么?我不希望它坐下来等待更多数据。假设一切都在那里,然后优雅地退出。

可能的?

0 投票
1 回答
837 浏览

apache-kafka - Kafka 流处理器上下文中的周期性 NPE

使用 kafka-streams 0.10.0.0,我在转发消息时会定期在 StreamTask 中看到空指针异常。它在调用的 10% 到 50% 之间变化。NPE发生在这种方法中:

似乎在某些情况下,thisNode字段为空。知道可能是什么原因造成的吗?堆栈跟踪如下。

0 投票
1 回答
7180 浏览

java - 如何注册无状态处理器(这似乎也需要 StateStore)?

我正在构建拓扑并希望使用KStream.process()将一些中间值写入数据库。此步骤不会改变数据的性质,并且是完全无状态的。

添加处理器需要创建一个ProcessorSupplier并将此实例KStream.process()与状态存储的名称一起传递给函数。这是我不明白的。

由于需要StateStoreSupplier ,如何将StateStore对象添加到拓扑?

在应用程序启动时未能添加 say 会导致StateStore此错误:

线程“主”org.apache.kafka.streams.errors.TopologyBuilderException 中的异常:无效的拓扑构建:StateStore my-state-store 尚未添加。

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器,这似乎很可能是可选的。

通过应用处理器处理此流中的所有元素,一次一个元素。

0 投票
1 回答
137 浏览

java - KStream 窗口聚合 - 分区问题

我在 AWS 上有一个简单的集群设置,有一个 kafka 实例和一个 zookeeper。我写信<String, String>给这个并努力在 10 秒的窗口中聚合这些值。

我收到的错误消息:

cluster metadata# 不断前进。

代码:

当有任何事情发生时,在哪里DBAggregateInit并被DBAggregate存根以记录到 DEBUG。没有其他功能。

这些存根函数都没有受到影响。

不知道我在这里错过了哪些步骤。如果我.foreach()或对该主题进行简单的阅读,它似乎可以正常工作。

FWIW:

当我让 kafka 创建主题而不是使用kafka-topic --create --topic ....