问题标签 [node-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
462 浏览

npm - Kafka 节点使用 Node-RED 抛出“TypeError: Client is not a constructor”

全流程我已经使用安装了kafka节点

但它抛出

谁能帮我解决这个问题?

kafka节点属性设置

0 投票
1 回答
731 浏览

node.js - Kafka-node 突然从偏移量 0 开始消费

有时,kafka-node 消费者从偏移量 0 开始消费,而它的默认行为是只消费较新的消息。然后它不会切换回其默认行为。你知道如何解决这个问题,会发生什么以及它的行为突然改变吗?代码非常简单,无需更改代码即可实现。

到目前为止,我发现的唯一解决方案是更改 kafka 主题。然后一切正常。有任何想法吗 ?

0 投票
1 回答
1646 浏览

deserialization - 没有模式注册表的Nodejs avro序列化,然后在Kafka Streams中进行反序列化

我想就以下问题寻求一些指导。我正在尝试学习如何在没有模式注册表的情况下使用 nodejs 执行 Avro 数据的序列化,将其发布到 Kafka 集群,然后在 Kafka Streams (Java) 中检索它。

在 javascript 方面,我尝试使用 kafka-node 和 avsc 进行序列化。在 Kafka Streams 中,我决定实现一个自定义 Serde,因为据我所知,Streams API 提供的 Avro Serdes 旨在直接从 Schema Registry 获取模式。

这是一个简单生产者的 javascript 代码片段:

以下是我目前尝试实现反序列化器的方式:

但是,当流应用程序尝试反序列化数据时,我遇到了以下错误,特别是在 DataFileReader 被实例化的代码行:

我不知道如何进行。任何意见,将不胜感激。

0 投票
0 回答
198 浏览

node.js - Angular 组件中的 Kafka 消费者

我已经用 C# 编写了产品,并且正在尝试在 Angular 应用程序中使用。我已经安装了节点模块'no-kafka'

当在 ts 组件中导入节点模块时

我收到如下编译错误

0 投票
1 回答
846 浏览

node.js - 如何使用 nodejs 消费者在 kafka 中实现并行性?

从理论上讲,由于nodejs是单线程的,当我定义多个消费者以增加吞吐量时,如何实现并行性?

例如,如果我有一个具有4 个分区的 kafka 主题,那么在消费者端,当与 nodejs 一起使用时,我将如何能够并行消费 4 条消息。最多我可以使用单线程事件循环实现并发。

一种可能的解决方案是分叉子进程(在本例中为 3),以便假设系统有 3 个空闲核心,每个进程都可以从特定分区接收消息。但是这种方法的效率/效果如何?

实现这一目标的最佳方法是什么?

0 投票
0 回答
136 浏览

node.js - 如何避免 HighlevelProducer rdkafka 中的数据丢失

使用 npm librdkafka 我正在向 Kafka 生成大量数据。我正在使用 HighlevelProducer。对于 50K 消息,此配置工作正常。但是对于像 100K 消息这样的大容量,会有数据丢失。根据配置,数据丢失从 200-1K 不等。尝试了各种配置,但无法找到达到 100% 准确度的最佳配置。任何建议都是有帮助的。下面是我的配置。

0 投票
1 回答
75 浏览

apache-kafka - Kafka / Mock 主题,方便消费者开发

鉴于整个 kafka 管道非常繁重,并且需要大量资源。

我想模拟一个主题,以便我可以独立/单独测试和开发消费者。

我真的找不到太多,我对卡夫卡也很陌生。

我想我们可以有类似的东西:

然后,稍后在消费者代码中:

这样我们的假货就会发送一些硬编码的东西stubs/*.txt

0 投票
0 回答
123 浏览

node.js - 无法间歇性重置 kafka 偏移量,报错“消费者组必须没有正在运行的实例,当前状态:稳定”

在连接消费者组之前,我正在尝试通过管理对象重置 kafka 偏移量。

我打电话

await admin.setOffsets({groupId: gID, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, 1635358089189) });

在开始 kafka 消费者之前,但我最终得到了这个错误: The consumer group must have no running instances, current state: Stable有时,并非总是如此,但几乎在每次交替运行时。

我正在遵循的步骤

  1. 首先让 kafka 消费者从最新的偏移量开始消费。
  2. 通过 终止进程ctrl + c,它调用下面的方法(测试突然终止)
  1. 我立即重新启动我的服务器。

  2. 在这次运行中,kafka 管理员将首先通过上面的代码重置偏移量,然后它会连接消费者来消费消息。

  3. 重新启动服务器后的某些时候,它工作正常,偏移量被重置并且消费者运行完美,但是当我重新启动服务器时,每隔一次运行我都会收到这个错误:The consumer group must have no running instances, current state: Stable

预期行为 当我调用 consumerGroup.stop() 和 consumerGroup.stop() 并终止进程时,服务器重新启动时不应有任何正在运行的消费者组实例

观察到的行为 即使在停止和断开消费者之后,仍然有任何消费者组处于运行状态,因此我无法重置偏移量,因为如果有任何正在运行的消费者实例,我们无法重置偏移量。参考链接: The consumer group must have no running instances when performing the reset https ://kafka.js.org/docs/admin#a-name-reset-offsets-by-timestamp-a-reset-consumer-group-offsets-by-timestamp

环境:

  • Mac 操作系统 11.2.1
  • KafkaJS 版本:1.15.0
  • 卡夫卡版本:2.8.1
  • NodeJS 版本:v13.8.0

附加上下文 我在这里遗漏了什么吗?我是 node.js 和 kafka 的初学者,可能在这里遗漏了一些明显的东西,所以请指出,或者随时提出任何其他解决方案来解决这个问题,谢谢。