问题标签 [kafkajs]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
node.js - Kafka消费者手动偏移提交
在我的一个用例中,包括使用数据、执行一些操作并将其生成到新主题。
我正在使用https://www.npmjs.com/package/kafkajs npm 库。
我想在成功操作后手动提交偏移量以避免任何数据丢失。我autoCommit: false
用来避免消费后数据自动提交。
这是手动提交偏移量的代码
正如我在某处读到的那样,如果我们故意提交每个偏移量(在消耗后立即提交偏移量),那么它将给经纪人带来负载,这样做不好。
我需要 kafka 专家建议对我的上述用例提出最佳方法以避免任何数据丢失?请指教
nestjs - NestJS:从Feed中获取时间戳和偏移量
这个想法是通过 NestJS KafkaClient 读取带有相应时间戳的 kafka 提要。Kafka 提要长期保留需要从提要中按需获取/查找/使用的数据。
寻找原生 NestJS 方法来解决这个问题。任何想法将不胜感激!
apache-kafka - Deno RangeError: Offset is outside the bounds of the DataView return new DataView(this.buffer, this.byteOffset, this.byteLength).getInt32(
我们正在为 Deno 创建一个 Kafka 客户端。我们正在尝试以与 KafkaJS 相同的方式实现 Endocindg/Decoding 我们的缓冲区。我们已经取得了一定的成功,但偶尔我们会收到以下错误:
错误:未捕获(承诺中) RangeError:偏移量超出了 DataView 的范围
return new DataView(this.buffer, this.byteOffset, this.byteLength).getInt32( ^ at DataView.getInt32 () at Buffer.readInt32BE (https://deno.land/std@0.76.0/node/buffer.ts :365:72) 在 Decoder.readInt32 (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/decoder.js:60:31) 在 Decoder.readBytes (file :///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/decoder.js:146:31) 在 V0Decoder (file:///C:/Users/wesge/Desktop/ Coding/Projects/OS-Labs/kafkaSaur/protocol/message/decoder.js:13:18) 在 decodeMessage (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol /message/decoder.js:26:14) 默认 (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/message/decoder.js:49:19)在 EntryDecoder (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/protocol/messageSet/decoder.js:91:10) 默认(file:///C:/Users/wesge/Desktop/Coding/Projects/ OS-Labs/kafkaSaur/protocol/messageSet/decoder.js:22:23) 在 decodePartition (file:///C:/Users/wesge/Desktop/Coding/Projects/OS-Labs/kafkaSaur/src/testConsumer.ts :137:21) [E] [daem] 应用程序崩溃 - 在开始之前等待文件更改...
错误来自这段代码的最后一行,与我们编写解码函数的方式有关。我试图弄清楚我们需要改变什么来消除这种情况,但我对编码和解码缓冲区不太熟悉。任何帮助将不胜感激!
javascript - KafkaJS:如何在远程网站/服务器上运行消费者?
我正在尝试使用 kafka 将实时数据流式传输到我们公司的网站,以便我可以显示实时更新图。
我有一个工作的 JS 使用者,我在 kafkaJS 文档的帮助下编写了它。
现在我想从我们网站上的 HTML 文件中调用这个脚本,以便它可以读取所有传入的数据并绘制它。
在我的网站上,这是我收到的错误:
我不确定我是否朝着正确的方向前进,或者这是否可能,我在这里是一个大菜鸟,我正在努力寻找有用的资源。
任何帮助或指导将不胜感激。
非常感谢
node.js - ResponseError:Confluent_Schema_Registry - 错误,状态 400
我在 Nodejs 中使用 kafkajs 和 confluent-schema-registry。我可以记录 message.value (avro),但在尝试解码时出错。它说: ResponseError: Confluent_Schema_Registry - 错误,状态 400
javascript - 当 kafkajs 消费者或生产者超时时在客户端检测和控制
我是 kafka 和 kafkajs 的新手。
我使用kafkaJS制作了一个 Kafka 生产者和消费者。
我想知道当消费者或生产者与 kafka 服务器的连接超时时,我如何控制我的 js 代码上发生的事情。
我想配置上一定有某种功能分配,但到目前为止我在文档上找不到任何东西。
javascript - 有没有办法让消费者使用 KafkaJS 获取最后一条消息?
我是 Kafka 和 KafkaJS 的新手。
我试图找到一种方法来获取发送到某个主题的最后 n 条消息,而不是获取该主题的所有未使用的消息。
例如,我有一个生产者每秒产生 1 条消息并让它等待 5 秒。
然后,当我启动消费者时,消费者会收到所有 5 条消息。(只有 1 个消费者在运行,1 个组,1 个分区,与生产者相同的主题)。
KafkaJS 上是否有配置仅获取最后 n 条生成的消息?
events - KafkaJS / MS 事件中心 - 使用 eachBatch 时偏移损坏
描述
大约一个月前,我们从使用 eachMessage 切换到 eachBatch,最近注意到我们服务的消费者没有处理来自随机分区的事件。生产者没有发现任何问题,并且在正常工作大约一个月后才开始出现问题,而我们的配置或使用方法没有任何更改。唯一发生的变化是在我们的基础架构方面,我们在事件中心启用了捕获;但是,尚不清楚这如何导致此问题。经过进一步调查,我们注意到偏移量以某种方式被破坏并且不再处于应有的位置。实际上,消费者停止阅读消息,因为似乎没有任何消息要阅读。我们将offset重置为latest,解决了一段时间的问题;然而,它在一两个小时后返回(再次随机)。值得一提的是,使用 eachMessage 恢复处理可以防止这种偏移量的损坏。
我在下面包含了配置和示例实现。非常感谢您对此的任何帮助。
配置
示例实现
环境:
- 操作系统:Mac OS 11.2.3
- KafkaJS 版本:1.14.0
- 平台:微软活动
- 集线器 NodeJS 版本:12.x
apache-kafka - NestJs/microservices Kafka - 每批消费消息
我正在寻找通过使用装饰器@MessagePattern('topic') 或等效的东西来使用nestjs/microservices 每批消费消息。我想一起获取多条记录,处理它们,然后在处理完所有批次后发送最新记录的提交。我在 Spring Kafka 中使用过批处理侦听器来执行此操作。
我应该使用 kafkajs 还是使用 nestjs/microservices 的方法?
node.js - 无法间歇性重置 kafka 偏移量,报错“消费者组必须没有正在运行的实例,当前状态:稳定”
在连接消费者组之前,我正在尝试通过管理对象重置 kafka 偏移量。
我打电话
await admin.setOffsets({groupId: gID, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, 1635358089189) });
在开始 kafka 消费者之前,但我最终得到了这个错误: The consumer group must have no running instances, current state: Stable
有时,并非总是如此,但几乎在每次交替运行时。
我正在遵循的步骤
- 首先让 kafka 消费者从最新的偏移量开始消费。
- 通过 终止进程
ctrl + c
,它调用下面的方法(测试突然终止)
我立即重新启动我的服务器。
在这次运行中,kafka 管理员将首先通过上面的代码重置偏移量,然后它会连接消费者来消费消息。
重新启动服务器后的某些时候,它工作正常,偏移量被重置并且消费者运行完美,但是当我重新启动服务器时,每隔一次运行我都会收到这个错误:
The consumer group must have no running instances, current state: Stable
预期行为 当我调用 consumerGroup.stop() 和 consumerGroup.stop() 并终止进程时,服务器重新启动时不应有任何正在运行的消费者组实例
观察到的行为
即使在停止和断开消费者之后,仍然有任何消费者组处于运行状态,因此我无法重置偏移量,因为如果有任何正在运行的消费者实例,我们无法重置偏移量。参考链接:
The consumer group must have no running instances when performing the reset
https ://kafka.js.org/docs/admin#a-name-reset-offsets-by-timestamp-a-reset-consumer-group-offsets-by-timestamp
环境:
- Mac 操作系统 11.2.1
- KafkaJS 版本:1.15.0
- 卡夫卡版本:2.8.1
- NodeJS 版本:v13.8.0
附加上下文 我在这里遗漏了什么吗?我是 node.js 和 kafka 的初学者,可能在这里遗漏了一些明显的东西,所以请指出,或者随时提出任何其他解决方案来解决这个问题,谢谢。