问题标签 [kafkajs]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
46 浏览

node.js - 使用 kafkajs 的 RequestReply 模式

我有一个问题,无法自己解决,所以寻求帮助。所以我要实现基于kafkajs的同步请求-回复模式。这是一个简短的原型:gist code

我在第二次迭代(嵌套承诺)时遇到了问题,eventEmitter 使用旧对象并且无法解决承诺。将不胜感激任何想法。

0 投票
0 回答
136 浏览

node.js - Kafka 消息发送失败:生产者断开连接 KafkaJSError:生产者断开连接

我正在尝试使用 NodeJS 客户端发送 Kafka 消息:

但是当我第一次发送消息时,我总是会收到这个错误:

我第二次发送消息时它工作正常。

你知道我该如何解决这个问题吗?我使用“kafkajs”:“^1.15.0”

0 投票
0 回答
83 浏览

kafka-consumer-api - KafkaJSProtocolError:协调器不知道这个成员

长期以来,我一直面临 Kafka Consumer 的这个问题,我试图用可用的解决方案来解决这个问题,但直到现在我还没有运气。我尝试增加会话时间,增加了 maxBytes 仍然无法解决。

HeartBeatInterval - 2000, sessionTimeout - 6000

0 投票
2 回答
74 浏览

node.js - 使用 KafkaJs 向 Strimzi 发布消息的 Pod 不断崩溃

这是我基于KafkaJs的发布者客户端。我创建了一个容器镜像Pod并向 Strimzi 代理提交了一个 YAML。

我的Dockerfile.

我的podYAML。

Pod不断崩溃并且kubectl logs没有显示任何内容 - 它是空的。Akubectl describe也没有透露任何东西。

我错过了什么?

0 投票
0 回答
201 浏览

apache-kafka - LibrdKafkaError: Broker: 随机运行大约 2 小时后的未知成员

现在,我想在node-rdkafka我们的服务中实现,但我多次遇到这个错误Broker: Unknown member。github 上的相同问题是https://github.com/confluentinc/confluent-kafka-dotnet/issues/1464。他们说我们的消费者使用相同的组 id 重试或延迟。但我没有发现我的代码有任何重试和延迟。或https://github.com/confluentinc/confluent-kafka-python/issues/1004,但我已经重新检查了所有消费者组 ID,它是独一无二的。

生产者的配置node-rdkafka如下:

消费者配置node-rdkafka如下:

使用这些配置,消费者能够接收事件,但它不会持续很长时间。大约 2 小时后,它会随机给出这些错误。我不确定是因为手动提交还是我们的功能需要很长时间,因为我已经尝试了两者asyncsync提交,所以 commitSync 它不依赖于我们的功能。

之所以这么说,是因为我们的函数需要很长时间,并且它使我们的消费者被踢出小组。在我发现额外的错误 Broker: Specified group generation id is not valid 之后,它可能是嫌疑人

来源:https ://github.com/confluentinc/confluent-kafka-dotnet/issues/1155

它说我需要增加会话超时,然后我尝试将其增加到"session.timeout.ms": 3000005 分钟,并且"heartbeat.interval.ms":3000我在 github 问题中发现心跳应该小于 = (timeout/3)。所以我认为 3sec 就可以了。

使用"session.timeout.ms": 300000"heartbeat.interval.ms":3000 消费者能够消费并持续很长时间,但问题是:

  • 第一次使用这些配置,它可以运行大约 0-2 秒来接收
  • 过了一会儿,它收到了,但需要 1-10 秒才能收到消息

详细错误:

0 投票
0 回答
20 浏览

node.js - use worker_thread of nodejs to do async function in the consumer kafka

Hello everyone I want to make my nodejs app run parallelism when Kafka listen to multiple consumer(multiple same topics or different) and then I will do the async function in the thread. How can I do that? I had tried worker_thread they said that I cannot use the promise function.

Could you help me?

0 投票
0 回答
19 浏览

kafkajs - Kafkajs - 正确/推荐的消费者配置

我开始使用kafkajs和阅读文档 - https://kafka.js.org/docs/sumption

并想问一些关于消费消息的问题。

  1. 使用eachMessage功能时,无需设置autoCommit: false。这是否意味着一旦eachMessage函数读取了一条消息,它就会自动提交?

  2. 如果我确实 set autoCommit: false,然后使用commitOffsets我希望提交的每条消息。我是否只需要增加offset我阅读的每条消息的值?

  3. fromBeginning当消费者重新启动时,我无法决定在场景中使用哪个值。如果我配置fromBeginning: true,这是否意味着在消费者重启的情况下,它将读取已经提交的消息?

提前致谢。

0 投票
0 回答
62 浏览

nestjs - 如何在 NestJS 中使用 ClientKafka 对 @Inject 进行单元测试

我需要使用 NestJS 对我的项目中的 kafka 实现进行一些单元测试,但我不知道该怎么做。

我有一个Service多数民众赞成注入客户端卡夫卡

模块

单元测试

该项目给我这个错误:

我不知道如何在 NestJS 中解决这个问题。例如在 Java 中,我相信这可能是因为@Mock ClientKafka clientKafka我对 NestJS 没有任何其他经验......请帮助我!:)

0 投票
1 回答
29 浏览

node.js - KafkaJs - 在后台运行消费者

我正在编写一个使用 Node 和kafkajs https://kafka.js.org/docs/sumption实现 2 个消费者的服务

我有 2 个异步函数,它们已经初始化了消费者,理论上正在等待读取消息。像这样的东西:

如果我希望消费者在服务中后台运行,我应该在初始化服务器时只导入模块文件吗?

我有一种感觉,这是行不通的,因为consumerModule必须调用函数并且因为它是async,不确定如何正确实现它。

例如:

请指教。

0 投票
1 回答
43 浏览

apache-kafka - Kafkajs - 从同一主题和分区读取多个消费者

我打算使用Kafkajs https://kafka.js.org/并在 NodeJs 服务器中实现它。我想知道如果我有 2 个(或更多)服务器实例正在运行,每个实例都有一个配置有相同组 ID 和主题的消费者,那么预期的行为是什么?

这是否意味着他们可能会阅读相同的消息?我应该为每个服务器实例指定一个唯一的消费者组吗?

我读到这个 ​​-多个消费者从同一个主题消费,但不确定它是否适用Kafkajs