问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
4880 浏览

json - 从 JSON 到 Avro 的 Kafka 流

我尝试使用 Kafka Stream 将带有 String/JSON 消息的主题转换为另一个主题作为 Avro 消息。

流主方法:

转型:

并得到这样的异常:

这是正确的方法吗?我是 Kafka Streams 和 Avro 的新手

0 投票
1 回答
1578 浏览

elasticsearch - Kafka 到 Elasticsearch、带有 Logstash 的 HDFS 或 Kafka Streams/Connect

我使用 Kafka 进行消息队列/处理。我的问题是关于性能/最佳实践。我会做自己的性能测试,但也许有人已经有了结果/经验。

数据是 Kafka (0.10) 主题中的原始数据,我想将其结构化传输到 ES 和 HDFS。

现在我看到了两种可能性:

  • Logstash(Kafka 输入插件、grok 过滤器(解析)、ES/webhdfs 输出插件)
  • Kafka Streams(解析)、Kafka Connect(ES sink、HDFS sink)

如果没有任何测试,我会说第二种选择更好/更清洁,更可靠?

0 投票
1 回答
298 浏览

apache-kafka - Kafka 将单个日志事件行聚合到一个组合日志事件中

我正在使用 Kafka 来处理日志事件。我对简单的连接器和流转换有 Kafka Connect 和 Kafka Streams 的基本知识。

现在我有一个具有以下结构的日志文件:

日志事件有多个由 event_id 连接的日志行(例如邮件日志)

例子:

通常有多个事件:

例子:

时间窗口(在 START 和 END 之间)最长可达 5 分钟。

结果我想要一个像

例子:

实现这一目标的正确工具是什么?我试图用 Kafka Streams 解决它,但我可以弄清楚如何..

0 投票
1 回答
3311 浏览

java - 为什么我看不到 Kafka Streams reduce 方法的任何输出?

给定以下代码:

println在 Reducer 的 apply 方法中有一个语句,当我预计会发生缩减时,它会成功打印出来。但是,上面显示的最终打印语句什么也不显示。同样,如果我使用to方法而不是print,我在目标主题中看不到任何消息。

在reduce语句之后我需要什么才能看到减少的结果?如果将一个值推送到输入,我不希望看到任何东西。如果按下具有相同键的第二个值,我希望减速器应用(它确实如此),并且我还希望减少的结果继续到处理管道的下一步。如前所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么。

0 投票
2 回答
3258 浏览

apache-kafka - Kafka Streams - 从指标注册表访问数据

我很难找到有关如何访问 Kafka Streams 指标注册表中数据的文档,我想我可能正试图在圆孔中安装一个方形钉。我希望在以下方面得到一些建议:

目标

收集Kafka Streams 指标注册表中记录的指标并将这些值发送到任意端点

工作流程

这是我认为需要完成的事情,并且我已经完成了除最后一个步骤之外的所有步骤(因为度量注册表是私有的,所以这个步骤有问题)。但我可能会以错误的方式解决这个问题:

  • 定义一个实现MetricReporter接口的类。构建 Kafka 在metricChange方法中创建的指标列表(例如,每当调用此方法时,使用当前注册的指标更新哈希图)。
  • metric.reporters在配置属性中指定此类
  • 设置一个进程,轮询 Kafka Streams 指标注册表以获取当前数据,并将值发送到任意端点

无论如何,最后一步在 Kafka 0.10.0.1中似乎是不可能的,因为指标注册表没有公开。如果是正确的工作流程(听起来不是..),或者我误解了提取 Kafka Streams 指标的过程,有人可以告诉我吗?

0 投票
1 回答
313 浏览

apache-spark - DSMS、Storm 和 Flink 的区别

DSMS 对应于数据流管理系统。这些系统允许用户提交将持续执行的查询,直到被用户删除。

像 Storm 和 Flink 这样的系统可以被视为 DSMS 还是更通用的东西?

谢谢

0 投票
1 回答
413 浏览

apache-spark - 流处理中的非确定性函数

像 StreamScope 这样的一些系统要求函数是确定性的(以及它们的处理顺序),这是因为每个消息在流中都有其序列号。在失败的情况下,此序列号用于确定是否应该重新计算事件(因为它是由流持续存在的),因此下游节点不会两次计算相同的事件。

Flink、Spark Streaming、Kafka-Streams 和 Storm 是否也要求函数具有确定性?

0 投票
0 回答
785 浏览

java - Kafka 流:结合 KStreams 和处理器 API?

我有一个使用高级 Kafka Streams KStreamAPI 实现的流,并希望将两个连续的处理器附加到该流。

KStreamAPI 提供了一个process方法,但是该方法不返回任何内容,因此我假设它会改变拓扑。

在第一个处理器之后,我想附加第二个处理器,但我并不完全清楚我可以将下游处理器添加到哪个对象。

有任何想法吗?

0 投票
1 回答
6694 浏览

java - 合并多个相同的 Kafka Streams 主题

我有 2 个 Kafka 主题流式传输来自不同来源的完全相同的内容,因此如果其中一个来源出现故障,我可以获得高可用性。我正在尝试使用 Kafka Streams 0.10.1.0 将 2 个主题合并为 1 个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复。

使用leftJoinKStream 的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主要主题关闭时,没有任何内容发送到输出主题。这似乎是因为,根据Kafka Streams 开发者指南

KStream-KStream leftJoin 总是由来自主流的记录驱动

因此,如果没有来自主流的记录,它不会使用来自辅助流的记录,即使它们存在。一旦主流重新联机,输出就会正常恢复。

我也尝试过使用outerJoin(添加重复记录),然后转换为 KTable 和 groupByKey 以消除重复项,

但我仍然偶尔会得到重复。我也commit.interval.ms=200经常使用让 KTable 发送到输出流。

处理此合并以从多个相同的输入主题中获得一次输出的最佳方法是什么?

0 投票
1 回答
6102 浏览

java - 如何在 KStream 中获取偏移值

我正在使用 Kafka Streams 开发 PoC。现在我需要获取流消费者中的偏移值,并使用它(topic-offset)->hash为每条消息生成一个唯一键。原因是:生产者是 syslog,只有少数几个有 ID。我无法在消费者中生成 UUID,因为在重新处理的情况下我需要重新生成相同的密钥。

我的问题是:org.apache.kafka.streams.processor.ProcessorContext该类公开了一个.offset()返回值的方法,但是我使用的是 KStream 而不是处理器,并且找不到返回相同内容的方法。

有人知道如何从 KStream 中提取每一行的消费者价值吗?提前致谢。