问题标签 [kafkajs]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
124 浏览

csv - 使用 Kafka 摄取 CSV 数据,然后让多个消费者订阅并创建新的 CSV 文件,这些文件是原始文件的子集

我正在观看 Robin Moffatt 的一个视频 ( https://rmoff.net/2020/06/17/loading-csv-data-into-kafka/ ),并相信 Apache Kafka 可能会帮助我自动化我拥有的工作流程。

我有一个要求,我需要从客户那里获取 CSV,将原始信息的子集以各种格式(文本或 csv)发送给 2 个供应商,从这些供应商接收数据,然后合并所有数据。

我有点不喜欢卡夫卡,但我想我会有一个如下的过程:

将客户的数据提取到 kafka 并保存到 SQL Server 或 Postgres 数据库。然后我将发布 2 个“我们有数据”流。每个流基本上都有一行,代表我们从客户那里收到的批次。这些作为主题的流将由 kafkaJS 消费者使用。使用消息中的信息,这些消费者基本上将根据该供应商所需的输出从数据库中选择数据。

在此过程中,我们期待 2 个响应。当每个响应进入 (SFTP) 时,我们会将响应文件(JSON 或 CSV)摄取到数据库中,就像我们对原始客户信息所做的那样。如果我们已收到所有数据,我们将发布另一条消息,该消息将由合并所有数据的消费者使用。

像罗宾这样的卡夫卡忍者有什么建议吗?非常感激。

广东

0 投票
0 回答
274 浏览

typescript - 无法通过`@kafkajs/confluent-schema-registry`检索kafka融合模式注册表

@kafkajs/confluent-schema-registry 在打字稿节点应用程序中使用,试图下拉模式。我已经在 J​​ava 中使用类似的融合模式注册表 java 库 w\o 问题完成了此操作。

创建模式注册表客户端时,我看不到指定任何有关严格性的选项的方法。

我的 JSON 模式在块中有一个名为“version”的属性schema {}。我能够检索主题的最新架构 ID,但检索架构本身失败。似乎正在使用一些严格性设置,该设置不允许其不期望的模式中存在属性。从融合模式注册表 Java 库访问模式时,我没有看到这个问题。

有没有办法关闭这种严格模式?

0 投票
0 回答
129 浏览

apache-kafka - 设置 Kafka 集群时遇到问题

我是 Kafka 的新手,我正在尝试设置一个 localhost 集群。

问题:完成设置后,我无法确认我的消息是否已生成或使用它们。

我所做的步骤:

第 1 步 - 动物园管理员

我确实zookeeper使用本地 telnet 连接和ruok命令(在默认端口 2181 上)安装并验证它已启动并运行。

第 2 步 - 配置

我有 3 个文件夹,每个文件夹都包含我从kafka 官方快速入门下载的 kafka 。在每个文件夹中,我对本地进行了config/server.properties如下修改:

我已经设置了broker.id=1,然后23其他人

我已经设置了listeners=PLAINTEXT://:9093,然后90949095其他人

我设置logs.dir=了一个本地日志目录,其中每个代理都有一个子目录

第 3 步 - 启动

我已经使用bin/kafka-server-start.sh config/server.properties3 个 kafka 文件夹中的命令启动了所有 3 台服务器,每台服务器都使用它自己的本地配置

第 4 步 - 主题

我使用以下命令从三个 kafka 文件夹之一创建了一个共享主题:

bin/kafka-topics.sh --create --topic topic-1 --zookeeper localhost:2181 --partitions 3 --replication-factor 3

第 5 步 - 测试

好的,现在要测试一切是否正常运行,我尝试了两种方法第一种是使用 CLI 并手动添加消息,从任何 kafkabin文件夹中,我使用了这两个命令:

kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 –topic topic-1

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic topic-1 --from-beginning

但是,当添加消息时,消费者端似乎什么都没有发生。关闭它时,我有Processed a total of 0 messages消息。

我还尝试使用该kafkajs库与消费者和生产者一起制作“hello world”类型脚本,但出现此错误:

{"level":"ERROR","timestamp":"2021-05-10T09:09:57.464Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 2)","broker":"kevin-UX390UAK:9093","clientId":"my-app","error":"The group coordinator is not available","correlationId":0,"size":55}

任何想法我做错了什么?非常感谢 !凯夫!

0 投票
0 回答
153 浏览

javascript - Testcafe 异步函数不在 kafkajs 消费者的 eachMessage 处理程序中运行

我正在尝试做这样的事情,但 testcafe 不会等到测试运行。一旦它点击 t.navigate(url),异步 runtest() 函数就会返回 undefined,并且“eachMessage”代码继续以未定义的值。如何确保 runtest 在返回之前成功完成。显然 await 不起作用,我没有从 testcafe 或 nodejs 收到任何错误消息

先感谢您

0 投票
0 回答
336 浏览

node.js - kafkajs 库无法在远程 kafka 集群中运行

我正在使用 Kafkajs 库编写一个 Kafka 生产者,并尝试将下面的代码与 Kafka 远程服务器连接起来。

下面是我的节点 js 代码。

但我收到以下错误。

{"level":"ERROR","timestamp":"2021-05-12T13:33:23.179Z","logger":"kafkajs","message":"[BrokerPool] 连接种子代理失败,正在尝试列表中的另一个代理:连接失败:端口应 >= 0 且 < 65536。收到 NaN。","re​​tryCount":4,"retryTime":3592} {"level":"ERROR","timestamp": "2021-05-12T13:33:26.789Z","logger":"kafkajs","message":"[BrokerPool] 连接种子代理失败,尝试列表中的另一个代理:连接失败:端口应该是>= 0 且 < 65536。收到 NaN。","re​​tryCount":5,"retryTime":7822}

我也更改了 server.properties 代码,请在下面找到代码详细信息以供参考。

listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://192.168.31.11:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3

当我在同一台服务器上的节点和 Kafka 等一台服务器上运行所有代码时,我没有收到任何错误,即使我尝试使用这样的 ngrok -> ngrok HTTP 9092但我得到了与上面分享的相同的错误。

我怀疑有没有办法从互联网上的任何地方连接我们的 Kafka 代理。

请求您帮助我解决此问题。

提前致谢。

0 投票
1 回答
155 浏览

node.js - 为什么 Promise.race 不能在 kafkajs eachMessage 回调中解析

我已经定义了这样的承诺......

我可以解决,但它最后没有打印结果,似乎超时和实际承诺都没有按预期工作?这是为什么?我怀疑它与在 kafka js 回调中使用 resolve 有关?


更新:似乎它Promise.race()没有解决,但为什么呢?

0 投票
1 回答
69 浏览

node.js - 如何停止接收有关主题的 kafka-metadata 消息

出于某种原因,有时,我们的 Confluent Kafka 消费者会收到关于特定主题的奇怪消息:

即使我吞下了所有错误并在失败后提交了偏移量,我也多次使用了完全相同的消息。

它只发生在一个特定的主题上。我们从 Scarth 删除并重新创建了这个主题,但它没有解决这个问题。

在以下屏幕截图中,我们看到一条消息为空,而其他消息不为空。这是什么意思?

在此处输入图像描述


生产者 - C#:


消费者 NodeJ:


我们无法使用ccloud使用有问题的消息:错误 - “恐慌:运行时错误:索引超出范围 [0],长度为 0”

我们迷路了。任何帮助都会很棒!


NodeJS:v14.15.4 KafkaJS:1.16.0-beta.18

0 投票
3 回答
753 浏览

apache-kafka - 创建 AWS amazon-MSK JavaScript 客户端连接

是否有人使用 JavaScript 成功建立了与 Amazon MSK Kafka 集群的客户端连接?没有 YouTube 视频或在线示例 AFAIK。尝试使用 KafkaJs npm 模块对我不起作用,因为如果没有在无法 ssh 进入的代理上安装 IamAWSLogin 插件,则不支持 SASL AWS 我是角色。

尝试使用普通 SASL 方法在 KafkaJs 上不起作用,因为 aws 不使用用户名和密码。

我也没有发现 kafka-node 有用。

有什么线索吗?

0 投票
0 回答
175 浏览

apache-kafka - NestJS CQRS 使用 Kafka 作为 Eventbroker

我一直在尝试设置 2 个 CQRS 微服务并让它们与 Kafka 一起通信。我启动了一个带有 kafka 和 zookeeper 的 Docker 容器,并连接了两者。据我在终端中看到的,连接也有效

我的问题是:我 在哪里以及如何将 Kafka 连接到两者,以便我可以将事件从一个微服务传递到另一个微服务

我尝试遵循 NestJS Kafka 文档并最终在 Producer-Microservice 的 CommandHandler 中执行此操作

我将消费者设置为 NestJS 微服务,如下所示:

现在在消费者微服务的控制器中,我尝试使用@MessagePattern()@EventPattern()装饰器,两者都没有工作。我没有得到控制台日志,也没有任何可用的回报。这就是我在消费者微服务中的控制器的样子:

0 投票
0 回答
69 浏览

node.js - 在 KafkaJS 中添加 keystore 和 truststore 配置

我正在尝试访问我们的 DevOps 团队创建的 Kafka 集群,但我无法访问它,因为我不知道如何在KafkaJS / Kafka-node npm 中设置配置,我已经在 GitHub 上提出了这个问题但是不想在这个测试部分等待太多。我收到了来自 devops 团队的keyStore& JKS 文件和密码。trustStore

即使我尝试在没有此配置的情况下访问集群,我也会收到以下错误。

提前致谢!


12345678910