问题标签 [kafkajs]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
node.js - kafkajs - 断开连接后优雅地停止 kafkajs 实例
我在生产和集成测试中都使用了 kafkajs。
在我所有的测试之前,我正在创建一个带有生产者和消费者连接/订阅/运行(eachMeassage)的 kafkajs 实例......在我所有的测试之后,我想优雅地停止我的所有节点进程,包括 kafkajs 组件。
我实际上是这样做的:
承诺似乎有效。我可以看到我的集成测试套件在生产者和消费者都断开连接的情况下完成。但是我的节点进程仍在运行,没有做任何事情。
在此之前,我使用的是 kafka-node。当我停止消费者时,我的节点进程结束而无需指定任何process.exit(0)
有没有办法优雅地销毁节点进程中的kafkajs实例?
security - kafkajs - SASL 身份验证的错误密码未引发错误
这是我的制作人使用kafkajs
. 当我在 中提供不正确的用户名时options.username
,我会看到该消息Connected
。如何处理连接错误kafkajs
?
node.js - 使用 KafkaJS 时出现 Kafka 连接错误
我已经根据此处的说明在本地开发机器上安装了 Kafka: https ://kafka.apache.org/documentation/#quickstart
如上面链接中所述,我能够在运行 .sh 文件时生成和使用消息。
但是,当尝试使用 KafkaJS 从 nodejs 应用程序连接到 Kafka 时,Kafka 服务器中会记录以下错误:
生产者代码如下:
任何想法为什么它不能从 Nodejs 代码工作,但在运行 .sh 文件时工作?
apache-kafka - 如何在kafkajs中使自动提交错误工作
场景:
我将 autocommit 设置为 false,
产生 12 条消息
消耗它们..(比如从偏移量 100)
关闭消费者
启动一个新消费者
在那个阶段,我希望第二个消费者从偏移量 100 开始再次读取所有消息(因为没有完成提交)
但是当产生新消息时,我看到第二个消费者从新的偏移量(113)开始,即提交仍然以某种方式发生..
我怎么了?
那是我的消费者代码
error-handling - 如何暂停KafkaJS消费者并在一段时间后重试
我正在使用 KafkaJS ( https://kafka.js.org ) 连接到 Kafka 集群。我的消费者负责处理可能在某个时间失败并在等待一段时间(例如,1 小时)后成功的任务。在此持续时间之前,任何处理任何任务的尝试都将导致另一个失败。
我尝试使用https://kafka.js.org/docs/sumption#a-name-pause-resume-a-pause-resume中所述的暂停和恢复方法。但是,消费者立即重新启动并继续再次消费失败的任务。
如何在不提交失败消息并立即重新启动消费者的情况下暂停消费消息?
apache-kafka - KafkaJS - 找到偏移最有效的方法
(当涉及到 kafka 和 kafkajs 时,完全是初学者,所以如果这是一个愚蠢的问题,我很抱歉)
我有一个问题,我们有一个保留 48 小时数据(数百万条记录)的主题;我想知道从这个主题中获取最后“20 分钟”数据然后还流式传输新消息的最佳方式。
此主题中的每条消息都是 JSON,并且具有自纪元 (UTC) 以来以 UNIX 毫秒为单位的时间戳。
性能显然是这里的一个问题
node.js - 等待KafkaJS中的领导选举
情况
我正在使用kafkajs写入一些动态生成的 kafka 主题。
我发现在注册制作人后立即写这些主题会经常导致错误:There is no leader for this topic-partition as we are in the middle of a leadership election
.
完整的错误是:
编码
这是导致问题的代码:
问题
两个问题:
- 在连接到生产者(或发送)时我应该做些什么特别的事情,以确保逻辑阻塞,直到生产者真正准备好将数据发送到 kafka 主题?
- 在向生产者发送数据以确保消息不被丢弃时,我应该做些什么特别的事情?
node.js - 创建 KafkaJS 中已经存在的主题是否可以?
情况
我正在使用KafkaJS动态创建一系列主题,并发现为了在不产生选举问题的情况下这样做,我应该使用createTopics
管理功能。
问题是,如果createTopics
在已经存在的主题上调用,它不仅会返回false
(如文档所述),而且还会发出一个错误,指出“具有此名称的主题已经存在”。
问题
我意识到这个错误直接来自 Kafka 协议,但我很担心,因为错误就是错误。
createTopics
即使可能会冒着创建已经存在的主题的风险,我运行是否安全,或者我是否还需要进行某种类型的错误处理?
如果它是安全的,是否可以消除该错误,因为它最终是噪音?
node.js - NestJs EventBus 在 EventHandler 重复事件
我正在尝试使用 Kafka 作为事件存储的 NestJs 的 Event-Sourcing 和 CQRS。
该应用程序是一个小而简单的应用程序,具有 2 个部分,客户和订单。您首先创建一个具有一些初始余额的客户,然后使用您创建订单的客户 ID,如果订单金额小于余额,则批准否则拒绝。
这是有问题的代码:https ://github.com/Ashniu123/nestjs-customer-order-eventsourcing-cqrs
我使用 KafkaJs 作为 EventBus(在 下创建了我自己的KafkaModulelibs/
)
当我使用 Kafka 和 MongoDB 运行它时,应用程序启动得很好。当我也创建客户时,事件CreateCustomerEvent
按预期发布并由 CommandHandler 推送到 Kafka。(使用landoop UI检查)
当从 Kafka 读取 Event 并将其推送到 EventBus 以由EventHandler
. 像CreateCustomerEventHandler。
我对 EventBus 使用 Kafka 的配置位于每个服务的 AppModule 中。例如,客户。
并且为 KafkaServicesubject$
中的事件配置了EventBus observable 。
这是应用程序日志(为我的评论添加//)。
customer-svc(命令端)
customer-view-svc(查询/查看端)
我可以从日志中推断出,Kafka 事件按预期仅被消费者接收一次,但被移动subject$.next
到EventHandler
两次。
此外,为了澄清,根据创建时customer._id的不同值所建议的,事件被两次推送到 EventHandler 。
使用调试器,我可以看到subject.observers
class 的数组中有 2 个值FilterSubscriber
。我不知道这是否有用,但只是想安排我自己解决这个问题的努力,经过 6 个小时的无所事事,我来这里寻求帮助:)。
如果你们更好地使用它,我已经在 repo 中添加了 launch.json 以与 VSCode 一起使用。只需使用正在运行的应用程序的 processId 进行附加。
PS我已经以类似的方式配置了两者的EventBus,customer-view-svc
并且两者都存在问题(即重复事件)。order-view-svc
我希望你们能帮助我解决这个问题。
谢谢。
node.js - 如何让消费者在nodejs中自动订阅动态创建的主题
stackoverflow 本身有几个类似的问题,但我仍然要再次询问,因为我想要NodeJS中的解决方案。
情景是——
(我正在使用 kafkaJS 库)
我有两个主题 topic-A 和 topic-B,所以我订阅/topic-.*/,然后创建 topic-C,现在我希望我的消费者也自动订阅 topic-c。但这并没有发生。
我只得到一个解决方案,即我必须在每个新主题创建后停止我的消费者并重新启动它。(但是当多个主题将动态创建时,这种方法似乎很难维护)。
任何人都可以通过示例向我建议任何其他解决方案。先感谢您。