问题标签 [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - 消费者组的 Kafka 流偏移量重置为零
我编写了 Kafka Streaming 应用程序,它只根据某些条件过滤行并将其加载到 MongoDB。
流式处理工作正常,但由于我的代码存在一些缺陷,我想再次重新处理整个数据。
一种方法是杀死流媒体应用程序,更改消费者组 id,从 mongo 中删除数据并重新运行应用程序。
如何在不更改消费者组 ID 的情况下实现此场景。
<<我使用的是Kafka 0.10版本>>
非常感谢帕里
apache-kafka - KStreamBuilder 无法将数据从 1 个主题流式传输到另一个主题
我试图使用 KStreamBuilder 将数据从一个主题移动到另一个主题。我尝试了以下代码,但有异常
输出 :
然后我尝试使用数据。
任何想法?我需要任何额外的配置吗?我正在使用 kafka 0.10.0.0 集群和客户端。
使用的依赖项。
java - 无法让加入的 Kafka 流运行或输出任何内容
对于下面的代码,stream1 和 stream2 都单独运行良好,我可以看到输出,但连接的流根本不记录任何内容。我感觉它与连接窗口有关,但是来自两个流的数据几乎同时进入。
apache-kafka - 如何发送时间窗 KTable 的最终 kafka-streams 聚合结果?
我想做的是:
- 使用数字主题(Long's)中的记录
- 聚合(计数)每个 5 秒窗口的值
- 将 FINAL 聚合结果发送到另一个主题
我的代码如下所示:
看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?
apache-kafka-streams - Apache Kafka Streams 中特定分区上的聚合
假设我有一个名为 Kafka 的主题SensorData
,两个传感器 S1 和 S2 将数据(时间戳和值)发送到两个不同的分区,例如 S1 -> P1 和 S2 -> P2。现在我需要分别汇总这两个传感器的值,假设计算 1 小时时间窗口内的平均传感器值并将其写入新主题SensorData1Hour
。有了这个场景
- 如何使用该
KStreamBuilder#stream
方法选择特定主题分区? - 是否可以在来自同一主题的两个(多个)不同分区上应用一些聚合函数?
java - kafkastreams - 增加更多处理能力
我正在研究将现有Flink应用程序/拓扑转换为使用KafkaStreams的 POC 。我的问题是关于部署的。
具体来说——在 Flink 中,将“工作节点”添加到 flink 安装中,然后在拓扑中添加更多并行化以跟上不断增长的数据速率。
随着数据速率的增加,如何增加 KStreams 的容量?KStreams 会自动处理这个吗?我是否启动更多进程(ala 微服务)?
还是我在这里错过了大局?
java - 停止 Kafka Streams 应用程序
是否有可能拥有一个运行主题中所有数据然后退出的 Kafka Streams 应用程序?
示例我正在根据日期将数据生成到主题中。消费者被 cron 启动,遍历所有可用数据,然后......做什么?我不希望它坐下来等待更多数据。假设一切都在那里,然后优雅地退出。
可能的?
apache-kafka - Kafka 流处理器上下文中的周期性 NPE
使用 kafka-streams 0.10.0.0,我在转发消息时会定期在 StreamTask 中看到空指针异常。它在调用的 10% 到 50% 之间变化。NPE发生在这种方法中:
似乎在某些情况下,thisNode字段为空。知道可能是什么原因造成的吗?堆栈跟踪如下。
java - 如何注册无状态处理器(这似乎也需要 StateStore)?
我正在构建拓扑并希望使用KStream.process()将一些中间值写入数据库。此步骤不会改变数据的性质,并且是完全无状态的。
添加处理器需要创建一个ProcessorSupplier并将此实例KStream.process()
与状态存储的名称一起传递给函数。这是我不明白的。
由于需要StateStoreSupplier ,如何将StateStore对象添加到拓扑?
在应用程序启动时未能添加 say 会导致StateStore
此错误:
线程“主”org.apache.kafka.streams.errors.TopologyBuilderException 中的异常:无效的拓扑构建:StateStore my-state-store 尚未添加。
为什么处理器需要有状态存储?对于无状态且不维护状态的处理器,这似乎很可能是可选的。
通过应用处理器处理此流中的所有元素,一次一个元素。
java - KStream 窗口聚合 - 分区问题
我在 AWS 上有一个简单的集群设置,有一个 kafka 实例和一个 zookeeper。我写信<String, String>
给这个并努力在 10 秒的窗口中聚合这些值。
我收到的错误消息:
cluster metadata
# 不断前进。
代码:
当有任何事情发生时,在哪里DBAggregateInit
并被DBAggregate
存根以记录到 DEBUG。没有其他功能。
这些存根函数都没有受到影响。
不知道我在这里错过了哪些步骤。如果我.foreach()
或对该主题进行简单的阅读,它似乎可以正常工作。
FWIW:
当我让 kafka 创建主题而不是使用kafka-topic --create --topic ...
.