问题标签 [apache-kafka-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - Apache Kafka Streams 构建和启动 jar
我想为 Apache Kafka Streams 编写一个小测试程序。到目前为止我所做的是:
- 安装 Intellij。
- 从 git 复制源代码。
- 运行
gradle wrapper
并gradle idea
在解压缩的文件夹中。 - 在 Intellij 中导入文件夹。
- 在streams/examples/src/main/java/...下创建新的java类。
- 跑
gradlew jar
- 将 examples/build/libs/... 中的 .jar 文件复制到服务器。
现在我想运行它。我试过了java -cp kafka.jar *Class*
但是,我得到的只是这个例外:
有人知道我做错了什么吗?我需要一些适当的指示。
avro - 使用 Kafka Streams、RocksDB 和 Avro 模式的部分范围查询
CompositeKey
我有一个名为<userId: int, attribute: string>
. 另一个名为SimpleValue
.
使用 Kafka Streams,我使用 Avro 序列化器/反序列化器将它们放入 KeyValueStore。
稍后,使用交互式查询我可以get
userId, attribute
一对一配对。但我也想得到所有attributes
属于一个单一的userId
. 当我使用时,
它不会返回所有属性。我猜这是因为底层的 RocksDB 像字节数组一样对它们进行排序,而 avro 序列化格式与此混淆。
有没有办法在 RocksDB 中使用 Avro 进行部分范围查询?否则,我将不得不将它们与从 Avro 模式生成的字符串一起以"<userId>+<attribute>"
.
apache-kafka - Kafka 流 API 与消费者 API
我需要读取特定的 Kafka 主题,对消息进行非常简短的处理并将其传递到不同的Kafka 集群。
目前,我正在使用一个消费者,它也是另一个 kafka 服务器上的生产者。
但是,流式 API 据称提供了更轻量级的高吞吐量选项。
所以问题是:
- 假设我的处理代码不需要太多马力,流式 API 会更好吗?
- 流式 API 是否支持写入不同的 Kafka 集群?
- 与 Consumer API 相比,Streaming API 的缺点是什么?
apache-kafka - 如何管理 Kafka KStream 到 Kstream 窗口连接?
基于apache Kafka docs KStream-to-KStream Joins are always windowed joins
,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?
有没有什么好的例子来显示一个窗口化的 KStream-to-kStream 窗口化连接?
就我而言,假设我有 2 个 KStream,kstream1
并且kstream2
我希望能够加入 10 天kstream1
到 30 天的kstream2
.
java - 如何加入两个 Kafka 流,每个流都有多个分区?
我有两个Kafka
流,request
每个流event
都在一个公共字段 requestId(最后两位数)上进行分区。我想加入两个流并写入HDFS
或本地filesystem
?如何编写一个在加入两者时consumer
只考虑相关的高效?partitions
streams
java - 将 Kafka 输入流动态连接到多个输出流
Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能?KStream.branch
允许基于真/假谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流式传输到的主题,例如,日志{"date": "2017-01-01"}
将流式传输到主题topic-2017-01-01
,日志{"date": "2017-01-02"}
将流式传输到主题topic-2017-01-02
。
我可以调用forEach
流,然后写信给 Kafka 生产者,但这似乎不太优雅。在 Streams 框架中有没有更好的方法来做到这一点?
apache-kafka - 用 Kafka Streams 中的内存状态存储替换 RocksDB
我正在使用 Kafka Streams 0.10.1.1 版本。
状态存储的 RocksDB 实现无法处理我们的 50k/msg 速率,所以我想将状态存储更改为内存中的存储。根据文档,这应该是可能的:http: //docs.confluent.io/3.1.0/streams/architecture.html#state
但是,当我实现这个时:
我最终在运行时出现此错误:
上述方法的实现是:
为什么它试图将状态存储转换为CachedStateStore
实例?我怎样才能实现一个简单的内存状态存储,根据文档应该是可能的?
谢谢
apache-kafka - Kafka 流以特定键作为输入加入
我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我要加入的键与我将数据写入每个主题的键不同。
假设我们有这 3 个 Avro 文件:
Alarm:
事件:
维护:
对于这些 Avro,我的 Kafka 中有 3 个主题(比如alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都会使用 ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这3个主题并获得一条新记录并将其写入一个新主题。问题是我想根据alarm_id和alarm_source_id加入警报和事件,并根据ne_id加入警报和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定了密钥吗?
apache-kafka-streams - Kafka Stream 使用 JoinWindow 进行数据重放
我有 2 个数据流,我希望能够在 1 个月的窗口内加入它们。当我有实时数据时,使用KStream和join一切都很有趣且超级简单。我做了这样的事情;
当我想进行数据重放时出现问题。假设我想为过去 6 个月的数据重新执行这些连接,因为我正在同时运行所有数据的管道 kafkaStream 将连接所有可连接的数据并且它不考虑时间差(其中它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
以及如何更改和操纵这个时间,以便我可以正确运行我的数据重播,我的意思是重新插入过去 6 个月的数据,每条记录需要一个月的时间窗口并基于该记录加入。
这个问题与How to manage Kafka KStream to Kstream windowed join不重复?,在那里我问我如何才能根据时间窗口加入。这里我说的是数据重放。根据我在加入 Kafka 期间的理解,将数据插入主题的时间作为 JoinWindow 的时间,所以如果你想进行数据重放并重新插入 6 个月前的数据,kafka 将其作为新数据今天插入,并将与一些其他数据加入它,这些数据实际上是今天不应该的。
testing - 测试 Kafka Streams 拓扑
我正在寻找一种测试 Kafka Streams 应用程序的方法。这样我就可以定义输入事件,并且测试套件会向我显示输出。
如果没有真正的 Kafka 设置,这可能吗?