问题标签 [apache-kafka-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
3 回答
3639 浏览

java - KStream batch process windows

I want to batch messages with KStream interface.

I have a Stream with Keys/values I tried to collect them in a tumbling window and then I wanted to process the complete window at once.

The thing is foreach gets called on each update to the KTable. I would like to process the whole window once it is complete. As in collect Data from 100 ms and then process at once. In for each.

at some point the new window starts with 1 entry in the map. So I don't even know when the window is full.

any hints to to batch process in kafka streams

0 投票
1 回答
3839 浏览

apache-kafka-streams - 如何在 Kafka 流中使用 HashMap 作为值创建状态存储?

我需要创建一个以字符串键 HashMap 作为值的状态存储。我尝试了以下两种方法。

代码编译正常,没有任何错误,但出现运行时错误

有人可以建议我创建国营商店的正确方法是什么?

0 投票
3 回答
9291 浏览

apache-kafka-streams - 无法在 Kafka 流中创建状态存储

Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1在我的 kafka 流应用程序中创建状态存储时出现此错误。这是应用程序的完整堆栈跟踪

我创建了一个状态商店,如下所示

知道如何解决这个问题吗?

0 投票
1 回答
9311 浏览

java - Kafka KStreams - 处理超时

我正在尝试使用<KStream>.process()aTimeWindows.of("name", 30000)来批量处理一些KTable值并将它们发送出去。似乎 30 秒超过了消费者超时间隔,之后 Kafka 认为所述消费者已失效并释放分区。

我尝试提高轮询频率和提交间隔以避免这种情况:

不幸的是,这些错误仍在发生:

(很多)

其次是这些:

显然,我需要更频繁地将心跳发送回服务器。如何?

我的拓扑是:

KTable每 30 秒按键对值进行分组。在Processor.init()我调用context.schedule(30000).

DBProcessorSupplier提供了一个DBProcessor的实例。这是AbstractProcessor的一个实现,其中已经提供了所有覆盖。他们所做的只是记录,所以我知道每个人什么时候被击中。

这是一个非常简单的拓扑,但很明显我在某处遗漏了一步。


编辑:

我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案。我喜欢当客户端退出/死亡时很快就可以使用分区的概念。


编辑:

为了简化问题,我从图中删除了聚合步骤。现在只是消费者->处理器()。(如果我将消费者直接发送给.print()它会很快工作,所以我知道没关系)。(同样,如果我通过它输出聚合(KTable).print()似乎也可以)。

我发现应该每 30 秒调用一次的.process()-实际上阻塞了可变时间长度并且输出有点随机(如果有的话)。.punctuate()

更远:

我将调试级别设置为“调试”并重新运行。我看到很多消息:

但是.punctuate()函数中的断点没有被命中。所以它做了很多工作,但没有给我使用它的机会。

0 投票
3 回答
1746 浏览

java - Kafka KStream - 使用带有窗口的 AbstractProcessor

我希望将来自 KStream 的窗口批次输出组合在一起并将它们写入辅助存储。

我期待看到.punctuate()大约每 30 秒被调用一次。我得到的反而被保存在这里

(原始文件长达数千行)

摘要 -.punctuate()看似随机然后反复调用。它似乎不遵守通过ProcessorContext.schedule()设置的值。


编辑:

.punctuate()相同代码的另一次运行大约每四分钟产生一次调用。这次我没有看到疯狂的重复值。来源没有变化 - 只是结果不同。

使用以下代码:

主要的

处理器

处理器供应商


编辑:

为了确保我的调试器不会减慢它的速度,我构建了它并在与我的 kafka 进程相同的盒子上运行它。这一次,它甚至没有尝试延迟 4 分钟或更长时间 - 在几秒钟内,它就向.punctuate(). 其中许多(大多数)没有对.process().

0 投票
2 回答
987 浏览

java - Kafka KTable - 跨机器共享聚合

假设我有一个包含许多分区的主题。我在其中写入 K/V 数据,并希望通过键在Tumbling Windows中聚合所述数据。

假设我启动了与分区一样多的工作实例,并且每个工作实例都在单独的机器上运行。

我将如何确保生成的聚合包含每个键的所有值?IE 我不希望每个工作实例都有一些值的子集。

这是StateStore的用途吗?卡夫卡自己管理这个还是我需要想出一个方法?

0 投票
1 回答
913 浏览

java - Kafka KStream 应用程序 - 临时文件清理

似乎我的基于KStream的应用程序已经堆积了许多 gB 的文件(.sst、Log.old.<stamp> 等)。

这些会自己清理吗?还是我需要注意这些?设置一些参数来剔除它们?

0 投票
1 回答
915 浏览

hadoop - Kafka Streams 在 HDFS 上查找数据

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望使用查找数据来丰富我正在处理的记录。该数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录。

如何在Kafka Streams应用程序中加载它并加入实际KStream
当新文件到达那里时,从 HDFS 重新读取数据的最佳做法是什么?

还是将Kafka ConnectRDBMS 表内容切换到所有 Kafka Streams 应用程序实例都可以使用的 Kafka 主题并将其写入会更好?

更新
正如建议的Kafka Connect将是要走的路。因为查找数据每天都会在 RDBMS 中更新,所以我考虑将 Kafka Connect 作为计划的一次性作业运行,而不是保持连接始终打开。是的,因为语义和保持连接始终打开并确保它不会被中断的开销......等等。对我来说,在这种情况下进行预定提取看起来更安全。

查找数据不大,可能会删除/添加/修改记录。我也不知道如何始终将完整转储到 Kafka 主题并截断以前的记录。启用日志压缩并为已删除的键发送空值可能不起作用,因为我不知道源系统中已删除什么。另外 AFAIK 当压缩发生时我没有控制权。

0 投票
1 回答
1491 浏览

java - Kafka KStream - 衡量消费者滞后

由于我的基于KStream的应用程序不遵循传统的 Kafka 消费者路线,我应该如何跟踪消费者滞后?通常我会使用ConsumerOffsetChecker (或类似的东西),但它需要一个消费者组名称。

我应该改用什么?

(我想对此进行跟踪,以便判断是否/何时推出新消费者)

0 投票
1 回答
463 浏览

java - Kafka KStream - 拓扑设计

我的流是键/值对,我想将它们作为“原始”和 60 秒聚合保存到数据库中。最初我是这样做的:

但后来我发现

一个。.aggregateby()只返回它匹配的对(我需要所有它们 - 匹配或其他)
b。我可以在阶段使用HashMap.process()来实现相同的聚合效果。然后当.punctuate()被调用时,我将所有 k/v 对写入数据库。

所以得到的拓扑变成:

kStreamBuilder.stream->foreach
kStreamBuilder.stream->process

问题:

  1. 这是获得写入所有匹配或其他kv对的结果的“合理”方式吗?(所有值通过foreach和任何对 + 其余通过process
  2. 在将原始流发送到之前,我是否需要(以某种方式)划分原始流.foreach().process()或者是否足以执行上述操作?