问题标签 [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 将 Kafka 流输入打印到控制台?
我一直在查看我正在开发的 Java 应用程序的大量 Kafka 文档。我已经尝试过使用 Java 8 中引入的 lambda 语法,但我在这方面有点粗略,并且对它应该是我目前使用的不太有信心。
我有一个 Kafka/Zookeeper 服务运行没有任何问题,我想做的是编写一个基于输入的小示例程序将其写出来,但不做字数统计,因为已经有很多例子了。
至于示例数据,我将得到以下结构的字符串:
示例数据
问题
我希望能够提取 3 个字母的关键字并用System.out.println
. 如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是通过字符串搜索来获取关键字。
代码
我有 Zookeeper、kafka、生产者和消费者运行都连接到同一个主题,所以我希望基本上看到String
所有实例(生产者、消费者和流)上都出现相同的内容。
java - Kafka KStreams - 如何添加线程/使用 StreamsConfig.NUM_STREAM_THREADS_CONFIG
我在搞乱这个参数并遇到了一些奇怪的事情。没有它我的应用程序运行正常,但是当我将此行添加到配置时:
CPU 使用率不会超过零。应用似乎没有做任何事情。没有错误。
是否有一些建议的方法来增加 KStreams 应用程序的线程使用率?或者只是“相信力量”,让它一起运行?
编辑:
- 我有两个分区
- 已通过
kafka-consumer-groups
- 大量可用记录检查了消费者滞后 - 即使只有 1 个分区 - 为什么有多个线程什么都不做?0% CPU。
apache-kafka - Kafka Streams reduceByKey 与 leftJoin
乍一看,在我看来,使用KStream#reduceByKey
one 可以实现与使用KStream to KTable leftJoin
. 即用相同的键组合记录。在性能方面,两者之间有什么区别?
d3.js - Bluemix Kafka 流
最新版本的 IBM Message Bus 是否支持 Kafka Streams(此处描述:http: //www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/)。有关如何执行此操作的任何示例/示例?如果是,这可以用 node.js 完成吗?
我想做的是
将数据从源数据库 (Cloudant) 流式传输到 Kafka 流(Bluemix 中存在连接器服务 - BETA 服务)
使用 Kafka Streams 聚合数据(摘要、计数等)
一个 Web 仪表板应用程序 - 前端(Node.JS、D3.JS)使用流并实时更新仪表板。
谢谢
java - kafka KStream - 采用 n 秒计数的拓扑
我有一个 JSON 对象流,我键入几个值的散列。我希望在 n 秒(10?60?)间隔内按键计数,并使用这些值进行一些模式分析。
我的拓扑:K->aggregateByKey(n seconds)->process()
在process - init()
我调用的步骤ProcessorContent.schedule(60 * 1000L)
中,希望.punctuate()
得到调用。从这里我将遍历内部散列中的值并采取相应的行动。
我看到值来自聚合步骤并命中process()
函数,但从.punctuate()
未被调用。
代码:
AggregateInit()返回空值。
我想我可以.punctuate()
用一个简单的计时器来做同样的事情,但我想知道为什么这段代码不能像我希望的那样工作。
java - Kafka - TimestampExtractor 的问题
我用org.apache.kafka:kafka-streams:0.10.0.1
我正在尝试使用基于时间序列的流,该流似乎不会触发KStream.Process()
触发(“标点符号”)。(请参阅此处以供参考)
在KafkaStreams
配置中,我传递了这个参数(等等):
这里,EventTimeExtractor
是一个自定义时间戳提取器(实现org.apache.kafka.streams.processor.TimestampExtractor
),用于从 JSON 数据中提取时间戳信息。
当每条新记录被拉入时,我希望这会调用我的对象(派生自TimestampExtractor
)。有问题的流是 2 * 10^6 记录/分钟。我已punctuate()
设置为 60 秒,但它永远不会触发。我知道数据非常频繁地通过这个跨度,因为它会拉动旧值来迎头赶上。
事实上,它根本不会被调用。
- 这是在 KStream 记录上设置时间戳的错误方法吗?
- 这是声明此配置的错误方法吗?
apache-kafka - Kafka多分区排序
我知道不可能在 Kafka 中对多个分区进行排序,并且分区排序只能保证组内的单个消费者(对于单个分区)。但是,使用 Kafka Streams 0.10 现在可以实现这一点吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者端,假设使用 Kafka Streams 0.10,现在这可能吗?假设我们收到所有消息,我们是否不能根据使用的时间戳对所有分区进行排序,并可能将它们转发到单独的主题以供使用?
目前我需要保持排序,但这意味着有一个分区和一个消费者线程。我想将其更改为多个分区以增加并行度,但以某种方式“让它们按顺序排列”。
有什么想法吗?谢谢你。
apache-kafka - AdminUtils.createTopic API 抛出 kafka.admin.AdminOperationException
我在 Windows 上使用 Confluent 3.0.1 平台。我按照安装指南和开发人员指南进行所有安装和开发我的拓扑。
我启动了 Zookeeper,然后启动了 Kafka 服务器并尝试运行我的拓扑。但是在 Kafka 服务器上出现以下错误。即使我手动创建主题并运行拓扑,我也会看到相同的错误。
我的拓扑代码如下:
以下是我正在使用的属性,它是不同 java 源文件的一部分。
apache-kafka - Kafka Streams“地图端”像字典查找一样加入
这个问题是Kafka Streams 在 HDFS 上查找数据的后续问题。我需要将小字典数据加入(如“地图端”加入)到主要的 Kafka 流
AFAIK,Kafka Stream 实例始终适用于主题的给定分区。如果我想进行查找,我需要为连接键重新分区两个流,以将相关记录放在一起。
如果需要检查多个查找数据,那么来回重新分区几次的成本是多少?不可能将整个查找数据集发送到每个分区,因此当我KTable
从查找主题构建一个时,我将在所有 Kafka Stream 应用程序实例中看到整个数据集。因此,我可以在KStream#transform()
将本地 RocksDB 存储与我拥有的所有查找数据一起使用的方法中进行查找。
我想知道哪个选项更合适:
将相同的数据(整个数据集)插入主题的每个分区并在
KStream#transform
. 当主题被过度分区时,我们会有很多重复的数据,但对于一个小数据集,这应该不是问题。使用 DSL API 对两个流进行重新分区,以便能够执行查找(连接)。就性能而言,这意味着什么?
apache-kafka - kafka new api 0.10 不提供每个主题的流和消费者对象列表
以前我一直在使用 0.8 API。当您将主题列表传递给它时,它会返回一个流映射(每个主题一个条目)。这允许我生成一个单独的线程并将每个主题的流分配给它。每个主题中的数据太多,产生一个单独的线程有助于多任务处理。
我想升级到 0.10。我检查KafkaStreams
并KafkaConsumer
上课。KafkaConsumer
object 接受配置属性并提供接受主题列表的订阅方法,其返回类型为 void。我找不到可以处理每个主题的方法。
KafkaStreams
另一方面似乎有同样的问题。
有source.foreach()
可用的方法,但它是所有主题的流。任何人,任何想法?