问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2044 浏览

java - Apache Kafka Streams 构建和启动 jar

我想为 Apache Kafka Streams 编写一个小测试程序。到目前为止我所做的是:

  • 安装 Intellij。
  • 从 git 复制源代码。
  • 运行gradle wrappergradle idea在解压缩的文件夹中。
  • 在 Intellij 中导入文件夹。
  • 在streams/examples/src/main/java/...下创建新的java类。
  • gradlew jar
  • 将 examples/build/libs/... 中的 .jar 文件复制到服务器。

现在我想运行它。我试过了java -cp kafka.jar *Class*

但是,我得到的只是这个例外:

有人知道我做错了什么吗?我需要一些适当的指示。

0 投票
0 回答
964 浏览

avro - 使用 Kafka Streams、RocksDB 和 Avro 模式的部分范围查询

CompositeKey我有一个名为<userId: int, attribute: string>. 另一个名为SimpleValue.

使用 Kafka Streams,我使用 Avro 序列化器/反序列化器将它们放入 KeyValueStore。

稍后,使用交互式查询我可以get userId, attribute一对一配对。但我也想得到所有attributes属于一个单一的userId. 当我使用时,

它不会返回所有属性。我猜这是因为底层的 RocksDB 像字节数组一样对它们进行排序,而 avro 序列化格式与此混淆。

有没有办法在 RocksDB 中使用 Avro 进行部分范围查询?否则,我将不得不将它们与从 Avro 模式生成的字符串一起以"<userId>+<attribute>".

0 投票
1 回答
1923 浏览

apache-kafka - Kafka 流 API 与消费者 API

我需要读取特定的 Kafka 主题,对消息进行非常简短的处理并将其传递到不同的Kafka 集群。

目前,我正在使用一个消费者,它也是另一个 kafka 服务器上的生产者。

但是,流式 API 据称提供了更轻量级的高吞吐量选项。

所以问题是:

  • 假设我的处理代码不需要太多马力,流式 API 会更好吗?
  • 流式 API 是否支持写入不同的 Kafka 集群?
  • 与 Consumer API 相比,Streaming API 的缺点是什么?
0 投票
2 回答
9548 浏览

apache-kafka - 如何管理 Kafka KStream 到 Kstream 窗口连接?

基于apache Kafka docs KStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?

有没有什么好的例子来显示一个窗口化的 KStream-to-kStream 窗口化连接?

就我而言,假设我有 2 个 KStream,kstream1并且kstream2我希望能够加入 10 天kstream1到 30 天的kstream2.

0 投票
2 回答
3453 浏览

java - 如何加入两个 Kafka 流,每个流都有多个分区?

我有两个Kafka流,request每个流event都在一个公共字段 requestId(最后两位数)上进行分区。我想加入两个流并写入HDFS或本地filesystem?如何编写一个在加入两者时consumer只考虑相关的高效?partitionsstreams

0 投票
1 回答
2921 浏览

java - 将 Kafka 输入流动态连接到多个输出流

Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能?KStream.branch允许基于真/假谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流式传输到的主题,例如,日志{"date": "2017-01-01"}将流式传输到主题topic-2017-01-01,日志{"date": "2017-01-02"}将流式传输到主题topic-2017-01-02

我可以调用forEach流,然后写信给 Kafka 生产者,但这似乎不太优雅。在 Streams 框架中有没有更好的方法来做到这一点?

0 投票
1 回答
4199 浏览

apache-kafka - 用 Kafka Streams 中的内存状态存储替换 RocksDB

我正在使用 Kafka Streams 0.10.1.1 版本。

状态存储的 RocksDB 实现无法处理我们的 50k/msg 速率,所以我想将状态存储更改为内存中的存储。根据文档,这应该是可能的:http: //docs.confluent.io/3.1.0/streams/architecture.html#state

但是,当我实现这个时:

我最终在运行时出现此错误:

上述方法的实现是:

为什么它试图将状态存储转换为CachedStateStore实例?我怎样才能实现一个简单的内存状态存储,根据文档应该是可能的?

谢谢

0 投票
2 回答
5270 浏览

apache-kafka - Kafka 流以特定键作为输入加入

我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我要加入的键与我将数据写入每个主题的键不同。

假设我们有这 3 个 Avro 文件:
Alarm

事件:

维护:

对于这些 Avro,我的 Kafka 中有 3 个主题(比如alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都会使用 ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这3个主题并获得一条新记录并将其写入一个新主题。问题是我想根据alarm_idalarm_source_id加入警报事件,并根据ne_id加入警报和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定了密钥吗?

0 投票
1 回答
1340 浏览

apache-kafka-streams - Kafka Stream 使用 JoinWindow 进行数据重放

我有 2 个数据流,我希望能够在 1 个月的窗口内加入它们。当我有实时数据时,使用KStreamjoin一切都很有趣且超级简单。我做了这样的事情;

当我想进行数据重放时出现问题。假设我想为过去 6 个月的数据重新执行这些连接,因为我正在同时运行所有数据的管道 kafkaStream 将连接所有可连接的数据并且它不考虑时间差(其中它应该只加入过去一个月的数据)。我假设 JoinWindow 时间是我们将数据插入 Kafka 主题的时间,对吗?
以及如何更改和操纵这个时间,以便我可以正确运行我的数据重播,我的意思是重新插入过去 6 个月的数据,每条记录需要一个月的时间窗口并基于该记录加入。

这个问题与How to manage Kafka KStream to Kstream windowed join不重复?,在那里我问我如何才能根据时间窗口加入。这里我说的是数据重放。根据我在加入 Kafka 期间的理解,将数据插入主题的时间作为 JoinWindow 的时间,所以如果你想进行数据重放并重新插入 6 个月前的数据,kafka 将其作为新数据今天插入,并将与一些其他数据加入它,这些数据实际上是今天不应该的。

0 投票
7 回答
10023 浏览

testing - 测试 Kafka Streams 拓扑

我正在寻找一种测试 Kafka Streams 应用程序的方法。这样我就可以定义输入事件,并且测试套件会向我显示输出。

如果没有真正的 Kafka 设置,这可能吗?