问题标签 [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
elasticsearch - 是否有适用于 DC/OS、ElasticSearch、Kafka Connect 和 Kafka Streams 的 CloudFormation 模板?
有很多SMACK 堆栈的示例,但在我的基础架构中,我想使用 ElasticSearch 和 Confluent Kafka Connect 和 Kafka Streams。
有一个很棒的教程介绍了如何部署基于 CloudFormation 的 SMACK 堆栈环境,还有一个教程介绍了如何使用 SMACK 创建 IoT 管道。
由于我正在研究Lambda 架构,因此我首先使用 ElasticSearch(不是 Cassandra)来处理我的批处理数据,并且想知道是否有使用 Kafka Connect、ElasticSearch 的 CloudFormation 模板。最终我们想将 Kafka Streams 与 InfluxDB 一起使用?
apache-kafka-streams - 由于 Kafka Streams 现在可用,SMACK 堆栈中是否需要 Spark 和 Akka?
即将深入了解具有 IoT 数据摄取架构的 Kafka Streams,并查看大量SMACK 堆栈示例。但是,现在可以使用Kafka Streams,是否真的需要 Spark 和 Akka(也许只是没有 Spark)?
apache-kafka - Kafka Streams 动态路由(ProducerInterceptor 可能是一个解决方案?)
我正在使用 Apache Kafka,并且一直在尝试使用 Kafka Streams 功能。我要实现的目标非常简单,至少在文字上是这样,并且可以通过常规的普通消费者/生产者方法轻松实现:
- 从主题的动态列表中读取
- 对消息做一些处理
- 将消息推送到另一个主题,该主题的名称是根据消息内容计算出来的
最初我以为我可以创建一个自定义接收器或注入某种端点解析器,以便以编程方式为每条消息定义主题名称,尽管最终找不到任何方法来做到这一点。因此,我深入研究了代码并找到了ProducerInterceptor类(引用自JavaDoc):
一个插件接口,允许您在生产者收到的记录发布到 Kafka 集群之前拦截(并可能改变)这些记录。
它是onSend方法:
这是从 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 方法调用的,在键和值被序列化并分配分区之前(如果在 ProducerRecord 中未指定分区)。
这对我来说似乎是一个完美的解决方案,因为我可以有效地返回一个带有我想要的主题名称的新ProducerRecord 。虽然显然有一个错误(我在他们的 JIRA 上打开了一个问题:KAFKA-4691)并且当键和值已经被序列化时调用该方法。太糟糕了,因为我认为此时进行额外的反序列化是可以接受的。
我对您更有经验和知识渊博的用户的问题将是您的意见和想法以及任何关于如何以高效和优雅的方式实现它的建议。
提前感谢您的帮助/意见/建议/想法。
以下是我尝试过的一些代码片段:
OutputTopicRouterInterceptor onSend 实现:
apache-kafka - 跨多个用户扩展 Kafka 流应用程序
我有一个设置,我将事件推送到kafka
,然后在同一个集群上运行Kafka Streams应用程序。公平地说,扩展Kafka Streams
应用程序的唯一方法是kafka
通过添加节点或增加来扩展集群本身Partitions
吗?
在这种情况下,我如何确保我的消费者不会关闭集群并确保关键管道始终是"on"
. 有什么概念Topology Priority
可以避免可能的停机时间吗?我希望能够在不损害核心管道的情况下为任何人公开流以构建应用程序。Apache storm
如果解决方案是设置另一个 kafka 集群,那么对于所有即席查询是否更有意义?(我知道很多消费者仍然可能导致kafka
集群出现问题,但至少topology
现在处理是隔离的)
apache-kafka - Kafka Streams - 指向同一主题的所有实例本地存储
我们有以下问题:
我们想监听某个 Kafka 主题并构建它的“历史” - 因此对于指定的密钥提取一些数据,将其添加到该密钥的现有列表中(或者如果它不存在则创建一个新列表)并将其放入另一个主题,它只有一个分区并且高度压缩。另一个应用程序可以只收听该主题并更新其历史列表。
我在想它如何适合 Kafka 流库。我们当然可以使用聚合:
它创建了一个由 Kafka 支持的本地存储并将其用作历史表。
我担心的是,如果我们决定扩展这样的应用程序,每个正在运行的实例都会创建一个新的支持主题${applicationId}-${storeName}-changelog
(我假设每个应用程序都有不同的applicationId
)。每个实例开始使用输入主题,获取一组不同的键并构建不同的状态子集。如果 Kafka 决定重新平衡,一些实例将开始错过本地存储中的一些历史状态,因为它们会获得一组全新的分区来消费。
问题是,如果我只是为每个正在运行的实例设置相同的 applicationId,它最终是否应该重放来自每个正在运行的实例具有相同本地状态的同一个 kafka 主题的所有数据?
apache-kafka - 哪个消费者 API 用于 kafka 0.10.1?
我是 Kafka 新手,我有一个 Kafka 领导者version 0.10.0
和一个 zookeeper version 3.4.6
。我遇到了两种类型的 Kafka 消费者 API:
我无法找到这两者之间的显着差异。Kafka 轮询和 Kafka 流式消费者有什么区别?两者都适用的用例有哪些?
任何帮助表示赞赏。
groovy - Kafka-Streams 在消费时抛出 NullPointerException
我有这个问题:
当我使用处理器 API 从主题中消费时,当在处理器内部使用方法时context().forward(K, V)
,Kafka Streams 会引发空指针异常。
这是它的堆栈跟踪:
我的 Gradle 依赖项如下所示:
更新:我尝试使用 0.10.0.1 版本,但仍然抛出相同的错误。
这是我正在构建的拓扑的代码......
我的处理器看起来像这样:
scala - 如何为 Kafka Streams 中的流添加冷却时间/速率限制?
我是流数据处理的新手,我觉得必须是一个非常基本的用例。
假设我有一个(User, Alert)
元组流。我想要的是对每个用户的流进行速率限制。即我想要一个只为用户输出一次警报的流。在接下来的 60 分钟内,用户的任何传入警报都应该被吞下。在这 60 分钟之后,应再次触发传入警报。
我尝试了什么:
用作aggregate
有状态转换,但聚合状态与时间相关。然而,即使结果KTable
的聚合值没有变化,KTable(作为变更日志)将继续向下发送元素,因此无法达到“限速”流的预期效果
这提供了以下输出:
我通常不清楚如何/何时aggregate
决定在下游发布元素。我最初的理解是它是立竿见影的,但似乎并非如此。据我所知,窗口在这里不应该有帮助。
Kafka Streams DSL 目前是否可能不考虑这种有状态转换的用例,类似于 Spark 的updateStateByKey或 Akka 的statefulMapConcat?我是否必须使用较低级别的处理器/变压器 API?
编辑:
可能的重复确实涉及记录缓存如何导致聚合何时决定向下游发布元素的问题。然而,主要问题是如何在 DSL 中实现“速率限制”。正如@miguno 指出的那样,必须恢复到较低级别的处理器 API。下面我粘贴了非常冗长的方法:
apache-kafka - 使用 Kafka Streams DSL 时如何处理错误和不提交
对于 Kafka Streams,如果我们使用较低级别的处理器 API,我们可以控制是否提交。因此,如果我们的代码中出现问题,并且我们不想提交此消息。在这种情况下,Kafka 将多次重新传递此消息,直到问题得到解决。
但是在使用其更高级别的流 DSL API 时如何控制是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
apache-kafka - Kafka Stream 处理期间的外部系统查询
我正在尝试为流式分析设计流式架构。要求:
- RT 和 NRT 流数据输入
- 实现一些财务分析的流处理器
- RT 和 NRT 分析输出流
- 流处理时引用数据请求
我正在探索用于流处理和 RT/NRT 实时消息传递的 Kafka 和 Kafka Streams。我的问题是:我需要在流处理期间对外部系统(信息提供者、MongoDB 等)执行一些查询。根据外部系统特征,这些查询可以是同步和异步请求响应。
我读过这篇文章,解释如何在处理过程中加入 KStream 和 KTable,这很有趣,但在这种情况下,KTable 不依赖于来自 KStream 的输入参数,它只是表的流表示。
我需要为每个 KStream 消息查询外部系统,将一些消息字段作为查询参数传递,并用查询结果丰富流式消息,然后将丰富的消息发布到输出主题。是否有任何统一的范式来设计这种流处理?有什么我最好使用的特定技术吗?请记住,查询可以是同步和异步的。
我还想为这些外部系统设计包装器,实现一种分布式 RPC,可从 Kafka 流处理中调用。你能推荐任何技术/框架吗?我正在考虑使用 Akka 演员来分发查询响应者,但我不明白 Akka 是否适合请求-响应范式。
谢谢