问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
19070 浏览

java - 将 Kafka 流输入打印到控制台?

我一直在查看我正在开发的 Java 应用程序的大量 Kafka 文档。我已经尝试过使用 Java 8 中引入的 lambda 语法,但我在这方面有点粗略,并且对它应该是我目前使用的不太有信心。

我有一个 Kafka/Zookeeper 服务运行没有任何问题,我想做的是编写一个基于输入的小示例程序将其写出来,但不做字数统计,因为已经有很多例子了。

至于示例数据,我将得到以下结构的字符串:

示例数据

问题

我希望能够提取 3 个字母的关键字并用System.out.println. 如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是通过字符串搜索来获取关键字。

代码

我有 Zookeeper、kafka、生产者和消费者运行都连接到同一个主题,所以我希望基本上看到String所有实例(生产者、消费者和流)上都出现相同的内容。

0 投票
1 回答
3577 浏览

java - Kafka KStreams - 如何添加线程/使用 StreamsConfig.NUM_STREAM_THREADS_CONFIG

我在搞乱这个参数并遇到了一些奇怪的事情。没有它我的应用程序运行正常,但是当我将此行添加到配置时:

CPU 使用率不会超过零。应用似乎没有做任何事情。没有错误。

是否有一些建议的方法来增加 KStreams 应用程序的线程使用率?或者只是“相信力量”,让它一起运行?


编辑:

  1. 我有两个分区
  2. 已通过kafka-consumer-groups- 大量可用记录检查了消费者滞后
  3. 即使只有 1 个分区 - 为什么有多个线程什么都不做?0% CPU。
0 投票
2 回答
1255 浏览

apache-kafka - Kafka Streams reduceByKey 与 leftJoin

乍一看,在我看来,使用KStream#reduceByKeyone 可以实现与使用KStream to KTable leftJoin. 即用相同的键组合记录。在性能方面,两者之间有什么区别?

0 投票
1 回答
538 浏览

d3.js - Bluemix Kafka 流

最新版本的 IBM Message Bus 是否支持 Kafka Streams(此处描述:http: //www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/)。有关如何执行此操作的任何示例/示例?如果是,这可以用 node.js 完成吗?

我想做的是

  1. 将数据从源数据库 (Cloudant) 流式传输到 Kafka 流(Bluemix 中存在连接器服务 - BETA 服务)

  2. 使用 Kafka Streams 聚合数据(摘要、计数等)

  3. 一个 Web 仪表板应用程序 - 前端(Node.JS、D3.JS)使用流并实时更新仪表板。

谢谢

0 投票
1 回答
476 浏览

java - kafka KStream - 采用 n 秒计数的拓扑

我有一个 JSON 对象流,我键入几个值的散列。我希望在 n 秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。

我的拓扑:K->aggregateByKey(n seconds)->process()

process - init()我调用的步骤ProcessorContent.schedule(60 * 1000L)中,希望.punctuate()得到调用。从这里我将遍历内部散列中的值并采取相应的行动。

我看到值来自聚合步骤并命中process()函数,但从.punctuate()未被调用。


代码:

AggregateInit()返回空值。

我想我可以.punctuate()用一个简单的计时器来做同样的事情,但我想知道为什么这段代码不能像我希望的那样工作。

0 投票
3 回答
3256 浏览

java - Kafka - TimestampExtractor 的问题

我用org.apache.kafka:kafka-streams:0.10.0.1

我正在尝试使用基于时间序列的流,该流似乎不会触发KStream.Process()触发(“标点符号”)。(请参阅此处以供参考)

KafkaStreams配置中,我传递了这个参数(等等):

这里,EventTimeExtractor是一个自定义时间戳提取器(实现org.apache.kafka.streams.processor.TimestampExtractor),用于从 JSON 数据中提取时间戳信息。

当每条新记录被拉入时,我希望这会调用我的对象(派生自TimestampExtractor)。有问题的流是 2 * 10^6 记录/分钟。我已punctuate()设置为 60 秒,但它永远不会触发。我知道数据非常频繁地通过这个跨度,因为它会拉动旧值来迎头赶上。

事实上,它根本不会被调用。

  • 这是在 KStream 记录上设置时间戳的错误方法吗?
  • 这是声明此配置的错误方法吗?
0 投票
2 回答
6966 浏览

apache-kafka - Kafka多分区排序

我知道不可能在 Kafka 中对多个分区进行排序,并且分区排序只能保证组内的单个消费者(对于单个分区)。但是,使用 Kafka Streams 0.10 现在可以实现这一点吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者端,假设使用 Kafka Streams 0.10,现在这可能吗?假设我们收到所有消息,我们是否不能根据使用的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供使用?

目前我需要保持排序,但这意味着有一个分区和一个消费者线程。我想将其更改为多个分区以增加并行度,但以某种方式“让它们按顺序排列”。

有什么想法吗?谢谢你。

0 投票
1 回答
491 浏览

apache-kafka - AdminUtils.createTopic API 抛出 kafka.admin.AdminOperationException

我在 Windows 上使用 Confluent 3.0.1 平台。我按照安装指南和开发人员指南进行所有安装和开发我的拓扑。

我启动了 Zookeeper,然后启动了 Kafka 服务器并尝试运行我的拓扑。但是在 Kafka 服务器上出现以下错误。即使我手动创建主题并运行拓扑,我也会看到相同的错误。

我的拓扑代码如下:

以下是我正在使用的属性,它是不同 java 源文件的一部分。

0 投票
1 回答
883 浏览

apache-kafka - Kafka Streams“地图端”像字典查找一样加入

这个问题是Kafka Streams 在 HDFS 上查找数据的后续问题。我需要将小字典数据加入(如“地图端”加入)到主要的 Kafka 流

AFAIK,Kafka Stream 实例始终适用于主题的给定分区。如果我想进行查找,我需要为连接键重新分区两个流,以将相关记录放在一起。

如果需要检查多个查找数据,那么来回重新分区几次的成本是多少?不可能将整个查找数据集发送到每个分区,因此当我KTable从查找主题构建一个时,我将在所有 Kafka Stream 应用程序实例中看到整个数据集。因此,我可以在KStream#transform()将本地 RocksDB 存储与我拥有的所有查找数据一起使用的方法中进行查找。

我想知道哪个选项更合适:

  • 将相同的数据(整个数据集)插入主题的每个分区并在KStream#transform. 当主题被过度分区时,我们会有很多重复的数据,但对于一个小数据集,这应该不是问题。

  • 使用 DSL API 对两个流进行重新分区,以便能够执行查找(连接)。就性能而言,这意味着什么?

0 投票
1 回答
197 浏览

apache-kafka - kafka new api 0.10 不提供每个主题的流和消费者对象列表

以前我一直在使用 0.8 API。当您将主题列表传递给它时,它会返回一个流映射(每个主题一个条目)。这允许我生成一个单独的线程并将每个主题的流分配给它。每个主题中的数据太多,产生一个单独的线程有助于多任务处理。

我想升级到 0.10。我检查KafkaStreamsKafkaConsumer上课。KafkaConsumerobject 接受配置属性并提供接受主题列表的订阅方法,其返回类型为 void。我找不到可以处理每个主题的方法。

KafkaStreams另一方面似乎有同样的问题。

source.foreach()可用的方法,但它是所有主题的流。任何人,任何想法?