问题标签 [librdkafka]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
javascript - 如何通过 node-rdkafka 传递多个 groupid 和主题对来消费?
我将node-rdkafka用于我的一个应用程序。我想使用 groupid.(groupid & topic pair) 使用来自多个主题的消息但我无法找到有关node-rdkafka文档的任何详细信息。所以我的问题是,是否可以通过多个 groupid 和主题对从单个 kafka 消费者连接同时使用来自多个主题的消息?
谢谢。
node.js - 有时 node-rdkafka 消费者没有从主题中读取任何消息
下面的代码片段工作正常。但有时它不是从 Kafka 的 topic 读取消息。我没有收到任何错误。在 Kafka 方面(我们使用的是 Aiven Managed Kafka),消费者组已与主题相关联,并且消费者脚本运行良好。
我需要您的指导来解决上述问题。
卡夫卡版本 - 2.0.1
节点模块版本 - “node-rdkafka”:“^2.7.0”
c++ - 如何使用 librdkafka 为纯文本协议(无 SASL)设置用户名和密码?
我正在尝试将具有security.protocol
conf 属性的生产者连接到纯文本。在调用之前如何传递username
和传递password
给对象?Conf
RdKafka::Producer::create
set
似乎没有一个电话适合。我阅读了有关 sasl的文档以获取灵感,但没有帮助。
c - Avro C 和 librdkafka - 消费者实现;获取 NULL 以解析字段的值 - 总是
示例代码片段
我遇到的问题是“KEY”总是一个 NULL 字符串。
我确实尝试了一个解决方案来对 argRkMessage->key 的内容进行十六进制转储,这似乎具有价值。
我正在使用 librdkafka 和 avro.c 来实现它。示例代码是 C-Kafka 消费者进程的精简版本,旨在获取以 AVRO 格式编码的复杂 Kafka 消息。
apache-kafka - librdkafka partition.assignment.strategy 添加新策略
今天使用 librdkafka, partition.assignment.strategy librdkafka有两个选项:范围和循环。我正在寻找是否有办法添加自定义分配策略。
最好的
apache-kafka - 与 Group Coordinator 断开连接后,Kafka Consumer 未重新连接
这是每 1 秒进行一次高级消费者轮询。会话超时 10 秒。心跳间隔3秒。我期待消费者在会话超时后自动重新连接。这是 librdkakfka 的预期行为,其中消费者可以在循环中愚蠢地调用“consume”,并且任何像这样的网络断开都应该由库自动处理。
我注意到当集群关闭并恢复时,消费者能够自动重新连接。而在这种情况下,由于本地网络问题,心跳请求没有通过并断开连接。生产者没有这个问题,当网络问题在一分钟内解决时,他们能够毫无问题地生产到集群。
从日志
LOG-5-REQTMOUT: [thrd:GroupCoordinator]: GroupCoordinator/25: Timed out HeartbeatRequest in flight(10377 毫秒后,超时 #0) LOG-4-REQTMOUT: [thrd:GroupCoordinator]: GroupCoordinator/25: Timed out 1 in- flight, 0 retry-queued, 0 out-queue, 0 part-sent requests ERROR (Local: Timed out): GroupCoordinator: 1 request(s) timeout: disconnect (after 3498718ms in state UP) RebalanceCb: Local: Revoke partitions: LOG-4-COMMITFAIL:[thrd:main]:x/x 分区的偏移提交(取消分配)失败:本地:等待协调器:错误(本地:代理传输失败):ssl://xxxx:p:接收失败:SSL 传输错误:连接超时(在状态 UP 3514121 毫秒后)错误(本地:所有代理连接已关闭):11/11 代理已关闭 LOG-4-REQTMOUT:[thrd:GroupCoordinator]:GroupCoordinator/25:飞行中超时 0,0 个重试队列,2 个出队列,0 个部分发送的请求
c++ - 如何在 librdkafka 中正确重新发送失败的消息?
我的 dr_cb 是这样的。我知道如何获取发送失败消息的内容,但如何获取主题?最优雅的方法是什么?
c - 如何将 librdkafka 有效负载转换为 json 以获取参数值?
我正在使用 librdkafka 的consumer.c 示例文件,我试图弄清楚如何将 rkm 有效负载(在第 244 行打印出来)转换为 json,以便我可以从 json 中获取参数的值.
现在我正在使用jansson,但遇到了一些问题,如果需要我可以扩展。
librdkafka 或我不知道的标准 C 库中是否有此功能?
c++ - librdkafka:rd_kafka_assignment 为分配的分区返回偏移量 -1001
当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量为 -1001。如果我打印出接收到的消息的偏移量,则偏移量设置为正确的值。
这是我用来消费消息的代码:
我知道这里有一个类似问题的答案LibRdKafka: commited_offset always at -1001
但这无济于事。我将主题分区列表分配给rebalance_cb
.
更新:
这是例如 2 条消息的输出:
node.js - 如何在node-rdkafka中一一读取消息
我正在使用 node-rdkafka ( https://github.com/Blizzard/node-rdkafka ) 来消费消息,基本设置工作正常,但每次我将某些内容推送到队列时都会触发该功能,无论是否完成以前的方法。
我希望在上一个功能完成时触发下一个数据单元。
这是我的实现