问题标签 [kafkajs]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
243 浏览

node.js - kafkajs 消费者滞后,但消息已提交

在 kafkajs消费者滞后方面需要一些帮助

一个 kubernetes pod 向 queque 发送 10 条消息,kafdrop 中的最后一个偏移量为 10,所有消息都成功传递到 3rd 方系统并成功提交

但是,kafdrop 中存在消费者延迟,有时是 1 条消息,有时是 3消息,而且是随机的

如果我们向 kafka 发送另外 10 条消息,一切正常 如果我们重新启动消费者 POD,延迟的消息将被发送并提交两次

kafkajs在第 3 方响应后设置为手动提交

请给点建议

0 投票
1 回答
1548 浏览

apache-kafka - KafkaJS:尝试生成有关主题的消息时 ECONNREFUSED

我正在使用 KafkaJS 来生成关于 Kafka 主题的消息。为此,我使用该wurstmeister图像将 Kafka 服务器放入 Docker 中。

我想要做的:Poll容器向主题生成消息Poll并使用来自responsePoll主题的消息。但是在尝试生成消息时出现错误

错误:poll | {"level":"ERROR","timestamp":"2020-10-24T15:21:27.113Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"127.0.0.1:9092","clientId":"BlueOriginX","stack ":"Error: connect ECONNREFUSED 127.0.0.1:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1145:16)"}

这是docker-compose.yml文件:

我想错误来自我的 docker 设置,但我不知道:/

编辑:这是产生消息的代码:

0 投票
0 回答
1182 浏览

apache-kafka - Kafkajs消费和生产批次问题

我想在我的 Node.js 项目中使用 kafkajs。让我展示我的代码。

制片人:

消费者:

一个实例可以处理 100 个项目(最多)。所以现在如果一个消费者拿这批有 5k 个物品,其他人也不会拿它(来自同一个 groupId,并行处理)。问题是:

  • 分批从多个消费者那里同时读取是真的吗?
  • 我可以将消费者配置为批量消费定义数量的消息吗?
  • 生产者是否应该发送具有正确批量大小的批次(100 件)?=> 生产者必须适应消费者?
  • 如果我需要定义的批次大小,如何正确地从生产者发送批次?
0 投票
1 回答
1044 浏览

apache-kafka - Kafka Protobuf 控制台消费者序列化异常

我有一个关于#protobuf #serialization 的问题,它发生在#nodejs 和#apache #kafka 中,由#confluent 平台在一个社区中运行。

我使用 google protobuf 或 protobufjs 序列化数据,然后使用 kafkajs 将其发送到 kafka。但是,当我提交数据时,kafka-protobuf-console-consumer 给了我一个序列化异常。请检查源代码并帮助我。 https://github.com/smhmayboudi/kafka-protobuf-console

0 投票
1 回答
303 浏览

apache-kafka - NestJs - Kafka 对有效负载进行过滤

我将 NestJs 与 KafkaJs 一起使用,我有两个微服务订阅相同的 EventPattern,但是其中一个微服务只需要 EventPattern 的单一类型的有效负载,例如:

事件模式::DoThing

在微服务 1 中,我只关心 DoThing -> job1,但是
需要 DoThing -> job2、job3 等。

有没有一种方法可以从嵌套中进行过滤,而不是在消费完成后进行简单的过滤。

欢迎任何建议。

0 投票
1 回答
747 浏览

apache-kafka - Kafka Producer:发送消息后断开连接与保持连接打开

我无法在kafkajs 文档或浏览官方 Apache Kafka设计文档中找到答案,但在他们的producer示例中,producer发送消息后断开连接。但是,这可能是因为它只是一个微不足道的例子,而不是一个长期运行的过程。

对于长时间运行的应用程序,如网络应用程序,我想知道在发送消息后断开与生产者的连接是否更好,或者(可能)在运行的应用程序的整个生命周期中保持连接打开更好。

保持连接打开的一个明显优点是它在发送消息时不会重新连接,一个明显的缺点是它保持 TCP 连接打开。我也不知道有多大的优势或劣势。

我的猜测是它取决于预期的数量。如果应用程序要不断发送消息,最好保持连接打开,而如果不经常发送消息,则发送消息后断开连接是合适的。

这是一个准确的评估吗?我更想知道我是否遗漏了细微差别或做出了错误的假设。

0 投票
1 回答
385 浏览

node.js - 如何在 Kubernetes 上的 Node.js 中扩展 kafka 消费者

有一些设计问题,希望根据您对 Kafka 和KafkaJS(任何此类库)的经验提供反馈:

  1. 分区是在 Kafka 中扩展的一种方式吗?如果我创建 3 个分区并且只有 1 个使用者,我是否会在这 2 个未使用的分区中丢失消息?如果我启动 2 个新消费者,KafkaJS是否会从专用分区管理新消费者的分配?在消费者中实现并行处理的唯一方法是eachBatch,可以用 eachMessage 完成并控制消息处理的速率吗?
  2. 扩大消费者规模的推荐方式是什么?分区/异步并行/增加消费者节点等?目前,我有 1 个节点每分钟消耗约 30 条消息,我的目标是扩展消费者,因为预期速率可能在约 2000 条以上。
0 投票
2 回答
2549 浏览

node.js - 如何在 KafkaJS 的 Jest 测试中等待 Kafka 响应?

考虑这个测试,其中一条消息从测试发送到主题“out”,并且测试的代码预计会使用它并通过向主题“in”发送消息来回复。为了通过,我想确保将消息发送到主题“in”。

我阅读了有关使用的信息done,但在定义测试本身时我不能拥有它async,我需要它才能使用await。有没有我不知道的不同方式?

0 投票
1 回答
457 浏览

node.js - 消费者有没有办法向使用 kafka(kafka.js) 和 node.js 实现的生产者发送确认?

我相信作为生产者属性的 3 种类型的确认仅限于领导者和生产者,我希望生产者在消费者通过 kafka 代理消费来自存储/队列的消息时接收到具体消息。如果我对生产者的“ acks ”属性有误,也请纠正我,它的默认值为“-1”,它确认所有副本是否都已接收/存储消息,但它与消费者有什么关系,或者可以当消费者提交并且kafka向生产者发送确认时,我们创建了一个桥梁?

0 投票
1 回答
617 浏览

apache-kafka - Nest.js 将 kafka 客户端注入服务

我有一个 Nest.js 微服务应用程序。对于 MS 之间的消息代理,我使用的是 kafka。

现在我想在健康服务中注入这个相同的连接来测试它的状态。我不想创建新客户端和新连接。我需要在 main.ts 中创建的那个。

理想情况下,我很想直接从 nestjs 获取 kafkajs 客户端,以创建生产者和消费者。