问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
777 浏览

java - 如何重新处理批处理的 Kafka 流

  • 我想根据创建消息的时间戳来批处理消息。
  • 此外,我想在固定时间窗口(1 分钟)中批量处理这些消息。
  • 只有在窗口通过后,才应该将批次推到下游。

为此,处理器 API 似乎或多或少适合(la KStream 批处理窗口):

但是,这种方法有一个主要缺点:我们不使用 Kafka Streams 窗口。

  • 不考虑乱序消息。
  • 实时操作时,标点计划应等于所需的批处理时间窗口。如果我们将其设置为短,批处理将被转发并且下游计算将开始快速。如果设置为长,并且当批处理窗口尚未完成时触发标点符号,同样的问题。
  • 此外,在保持标点时间表(1 分钟)的同时重放历史数据将仅在 1 分钟后触发第一次计算。如果是这样,那将炸毁 statestore 并且感觉不对。

考虑到这些点,我应该使用 Kafka Streams 窗口。但这只有在 Kafka Streams DSL 中才有可能......

在这方面的任何困难都会很棒。

0 投票
1 回答
1685 浏览

apache-kafka - StreamsException:提取的时间戳值为负,这是不允许的

这可能是使用 kafka-node -negative timestamp 的 Kafka Streams 中的 Error的重复,但肯定不是。我的 Kafka Streams 应用程序对每条消息执行一些转换逻辑并将其转发到新主题。应用程序中没有基于时间的聚合/处理,因此不需要使用任何自定义时间戳提取器。这个应用程序运行了好几天,但突然之间应用程序抛出了一个负时间戳异常。

从所有 StreamThreads(总共 10 个)抛出此异常后,该应用程序有点冻结,因为几个小时内流没有进一步的进展。之后没有抛出异常。当我重新启动应用程序时,它开始只处理新来的消息。

现在的问题是,介于两者之间的那些消息发生了什么(在引发异常之后和重新启动应用程序之前)。万一,那些丢失的消息没有嵌入时间戳(极不可能,因为代理和生产者没有发生任何变化),是不是应用程序应该为每条这样的消息抛出异常?或者是不是像应用程序在第一次检测到消息中的负时间戳时停止流进度?有没有办法处理这种情况,以便应用程序可以处理流,即使在检测到任何负时间戳之后?我的应用程序使用 Kafka Streams 库版本 0.10.0.1-cp1。

注意:我可以轻松地建立一个自定义时间戳提取器,它可以检查每条消息中的负时间戳,但这对我的应用程序来说是很多不必要的开销。我只想了解为什么在检测到带有负时间戳的消息后流没有进行。

0 投票
1 回答
4832 浏览

scala - Kafka Streams 0.10.1“无法刷新状态存储”

我正在尝试使用 Kafka Streams 0.10.1 在 Scala 中创建一个简单的聚合示例,尽管我似乎因简单的“计数”聚合而失败(使用 Kafka 控制台生产者)。使用这样的代码:

它因“无法刷新状态存储 count-test-1”而失败,我在帖子末尾包含了完整的堆栈跟踪。另一方面,如果我使用 print() 而不是 to() 它就像一个魅力,将结果打印到控制台/终端:

有谁知道这种行为的原因可能是什么?

仅供参考,我使用的操作系统是作为主机的 Windows 10(也通过 IntelliJ 运行 Scala 应用程序)和用于 Kafka(在 Docker 容器中)和生产者/消费者应用程序的 Ubuntu 16.04 VM。但是,我可以确认在 Ubuntu VM 上运行应用程序时也会遇到问题。

非常感谢您的帮助,感谢您的任何见解:-)

完整的堆栈跟踪:

0 投票
0 回答
354 浏览

java - KStreams 应用程序 - 内存使用过多

我正在运行一个(相对)简单的 KStreams 应用程序:

流->按键聚合->过滤器->foreach

它在具有 32Gb / 8CPU 的 AWS EC2 上每分钟处理约 20 万条记录

在启动它的 10 分钟内,内存使用量超过 40%。不久之后(通常少于 15 分钟),操作系统将 OOM 杀死它。

配置:

聚合步骤:

使用 Kafka 0.10.1.1

关于在哪里寻找罪魁祸首的建议?


边注:

我尝试使用NewRelic javaagent 来检测这个应用程序。当我用它运行它时-XX:+useG1GC,标准“使用大量内存然后被杀死”但是当我删除 G1GC 参数时,进程将系统负载运行到 > 21。我不得不自己杀死那个。

NewRelic 的输出没有显示出任何令人发指的内存管理。

0 投票
1 回答
1341 浏览

kafka-consumer-api - 如何使用 Kafka-Stream 按间隔从 Kafka 读取记录

我想在 Kafka-Stream 消费者中使用从 Kafka 读取记录,有一个选项可以在每个给定的时间间隔读取记录吗?例如每 1 分钟?

0 投票
0 回答
4322 浏览

java - 使用 Kafka Streams DSL 窗口在列表中聚合 Java 对象

我有最直接的 Kafka Streams DSL 用例:读取 CSV 传感器数据,按时间戳分组和输出。以下代码无法编译:

由于

错误:(90、45)java:找不到符号符号:方法添加(java.lang.Object)位置:java.lang.Object类型的变量列表

诡异的。

按注释进行转换会导致以下运行时异常(在输出窗口累积之前)。

调试的add方法SensorDataAccumulator应该给出一个线索:

在此处输入图像描述

所以,如果我理解正确,我会保留 aArrayList list = new ArrayList<SensorData>();但实际上,在过程中的某个地方,它的成员被更改为LinkedTreeMap. 打字员把我弄丢了……

好的,这LinkedTreeMap是 GSON 用于我JsonDeserializerJsonSerializer类的底层数据结构。因此,为了完整起见,我将在下面添加这些内容。

目前我不确定我做错了什么以及在哪里修复它。我应该使用不同的序列化程序、不同的数据结构吗?不同的语言;)?

欢迎任何意见。

0 投票
2 回答
3031 浏览

apache-kafka - 卡夫卡流 - 加入两个ktables调用连接函数两次

我正在尝试加入 2 个 KTables。

合并功能非常简单,我只是将值从一个 bean 复制到另一个。

但是由于某些原因,连接函数在单个生成的记录上调用了两次。请参阅下面的流/生产者配置

生产者配置 -

接下来我要为每个流提交一条记录。两条记录具有相同的键。我期待收到单条记录作为输出。

但是 ValueJoiner 触发了 2 次,我得到了 2 个相同的输出记录而不是一个。在触发期间 - 两个流中的两个值都存在 - 我无法获得触发第二次执行的内容。

不加入 - 我无法重现这种行为。我找不到 2 ktable join 的任何工作示例 - 所以无法理解我的方法有什么问题。

添加演示相同行为的简单代码

0 投票
1 回答
2162 浏览

apache-kafka - 在 Apache Kafka 中构建和查询状态:Kafka Stream?

我正在为我的需要构建一个 apache 集群,其中大部分是无状态的。但是有一种情况我真的需要状态。

为了解释,假设我正在存储每家开业的药店以及在该店和每家商店发生的交易。所以商店以药物数量的初始状态打开。随着药品的销售和更多的药品库存,状态不断变化。

虽然 Kafka 满足了我实时跟踪实时交易的需求,但我需要能够建立药店状态和查询,并在任何给定点找出商店中给定药物的数量。是否可以?这就是 Kafka Stream 的用途吗?

0 投票
1 回答
3243 浏览

apache-kafka-streams - Kafka 流聚合是否有任何排序保证?

我的 Kafka 主题包含由 deviceId 键入的状态。我想KStreamBuilder.stream().groupByKey().aggregate(...)用来只保留状态的最新值TimeWindow。我猜想,只要对主题进行 key 分区,聚合函数总能以这种方式返回最新的值:

(key, value, older_value) -> value

这是我可以从 Kafka Streams 获得的保证吗?我应该推出自己的处理方法来检查时间戳吗?

0 投票
1 回答
6614 浏览

apache-kafka - KafkaStreams - InconsistentGroupProtocolException

我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示:

我的代码库的另一部分直接使用消费者客户端建立到我们的集群的连接。

我这样做的原因是在有条件地启动应用程序的其他部分(包括 Kafka Streams 拓扑)之前收集有关消费者组的元数据。可能还有其他方法可以做到这一点(例如通过各种钩子或其他方法),但我更好奇为什么这些方法的混合有时会(间歇性地)导致InconsistentGroupProtocolException被抛出。

有人可以解释一下为什么要抛出这个吗?我很难从源代码本身确定到底发生了什么,但我猜 Kafka Streams 构建的底层消费者正在指定与KafkaConsumer客户端不同的分区协议。无论如何,将不胜感激任何有助于理解此异常的帮助