问题标签 [librdkafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
948 浏览

polly - 如何实现等待重试瞬态故障处理策略?

我对 Kafka 和Polly 还很陌生。我正在寻求有关如何在将 Admin Client 与 Kakfa Confluent .NET 客户端一起使用时实现故障恢复的建议。如果在启动 Blazor 服务器 Web 应用程序期间主题不存在,我正在使用管理客户端创建主题。

最初,我尝试使用polly来实现一个简单的等待和重试策略,如下所示。我希望这会重试创建主题操作以进行可配置的尝试次数。在每次重试尝试之间有一个短暂的可配置等待延迟。如果所有重试尝试都已用尽,则发出致命错误信号并且应用程序正常退出。

等待和重试策略

使用管理客户端实例的等待和重试策略来创建主题

日志和分析

当我尝试运行它时,我可以从下面的日志中看到尝试了一次重试。但是,不会尝试后续重试。日志突出显示rdkafka库检测到并抑制了相同的错误。

我认为这就是日志中没有显示后续重试尝试的原因,即底层rdkafka库轮询线程隐藏了后续本地超时错误并继续尝试连接到代理。随后,.NET 客户端无法引发异常,因为它没有收到失败通知。这意味着 polly 卡在等待并运行第二次尝试?

Confluent Kafka .NET 库在第一次尝试时抛出的错误是ErrorCode.Local_TimedOut 。我认为这对应于rdkafka:error local timeout ?? 经过调查,我发现以下与瞬态故障处理相关的AdminClient配置属性:

这让我尝试了 Confluent Kafka 内置的瞬态故障处理:

在启动过程中稍等片刻后,我启动了 Kafka 代理。下面的调试日志显示rdkafka线程检测到主代理已启动并正在运行。但是,未执行创建主题管理操作。

调试日志

由于空间限制,可在pastebin上查看。

问题

是否可以配置rdkafka,以便在尝试将AdminClient连接到代理时不会抑制相同的错误?

在上面的例子中,一旦rdkafka poll 线程最终检测到 broker 启动并运行,为什么AdminClient创建主题操作没有完成?

如何使用带/不带 Polly 的 Confluent Kafka .NET 为请求在代理上创建主题的 Kafka AdminClient实施重试和等待失败策略?

更新

创建一个小型控制台应用程序,可在pastebin获得。

如果在每次重试尝试时创建一个新的 AdminClient,并且在检测到故障后最终启动代理,那么它就可以工作。

但是,如果每次重试尝试都重复使用相同的 AdminClient 实例,并且在检测到故障后最终启动代理,则程序会阻塞。我认为这是因为 rdkafka 库正在为客户端抑制多个本地超时错误。它只通知 Confluent Kafka .NET 初始故障检测。

有没有更好的方法,而不是在每次重试时创建一个新的 AdminClient 实例?

0 投票
0 回答
264 浏览

apache-kafka - 在 librdkafka 中设置 OUTHBEARER 令牌

我正在尝试编译一个简单的客户端以使用 SSL 和 SASL/OAUTHBEARER 连接到 kafka 服务器。

到目前为止,我尝试过的代码是consumer.c基于librdkafka. 整个修改后的代码如下:

特别是,这是我添加到consumer.c示例中的代码librdkafka

我已经设法在这里看到:https : //github.com/edenhill/librdkafka/issues/3017oauthbearer_set_token必须设置(甚至在设置任何回调之前,因为令牌至少会持续一段时间,以便在我的理解中第一次连接是不需要回调-我正在寻找一个基本的“hello world”客户端首先工作-)。

现在,如果我的令牌是这样的:

我的理解是,我将使用“ZzdfZmUybHdBc...fje77fx8A”作为令牌,并且在从 OAUTH2 基础设施收到令牌后大约 604800 秒后过期。

我应该如何将这些值具体转换为oauthbearer_set_token

0 投票
1 回答
268 浏览

apache-kafka - USER1:初始化端口 0 EAL:错误 - 退出代码:1 原因:无法设置 TX 队列;port=0, err=无效参数 (fastcapa)

我正在尝试在安装和构建 dpdk 和 librdkafka 的 VM(ubuntu20.04)上运行 fastcapa(metron 项目)。但是当我运行这个命令时,我得到了这个错误:cmnd:sudo ./fastca pa -c 0x0f -n 1 --huge-dir /mnt/huge_1GB -- -p 0x00 -t pcap -c /etc/project/metr on/metron-sensors/fastcapa/conf/fastcapa.conf 错误:

有人可以帮助解决这个问题。谢谢你。

0 投票
1 回答
39 浏览

apache-kafka - 如何在使用 node-rdkafka 创建新主题时设置主题级别配置?

我一直在使用 node-rdkafka npm 包来处理 node 和 kafka。为了创建一个新主题,我一直在使用以下代码: client.createTopic({ topic: topic.name, num_partitions: _.get(topic, "partitions", 1), replication_factor: _.get(topic, "replicas", 3) } 我需要添加主题级别的retention.ms以覆盖在代理级别设置的默认 7 天。有没有办法使用 node-rdkafka 来做到这一点

0 投票
0 回答
274 浏览

docker - 带有 Docker 的 Librdkafka - Kerberos 支持

尝试将 librdkafka 与 Docker (Alpine) 一起使用。基于此链接 - https://github.com/confluentinc/confluent-kafka-go,我尝试创建我的 Dockerfile。

运行时出现此错误:standard_init_linux.go:219: exec 用户进程导致:exec 格式错误

我的 docker 架构和主机架构是 AMD64。

0 投票
0 回答
127 浏览

c - rd_kafka_consumer_poll() 大多数时候返回 null

我正在使用来自https://github.com/edenhill/librdkafka/blob/master/examples/consumer.c的 consumer.c没有任何更改并执行它./consumer broker groupid topicnamerd_kafka_consumer_poll在大多数情况下返回 null (某些 groupid 值除外) 即使 kafka 集群中存在数据。我会理解是否有一些调用返回 null,但我不明白为什么它总是为 null。

我尝试将轮询超时从100msto增加1000ms,它仍然没有任何区别,并且一直返回 null 。但在某些随机groupid值下,它确实会从集群中获取所有数据。在重复运行时,我必须不断将groupid值增加到随机值,以便消费者获取数据。

  1. 这是预期的行为rd_kafka_consumer_poll吗?或者我在实施中有什么需要改变的。
0 投票
0 回答
118 浏览

c - 如何使用 librdkafka 发送 json 数据?

我正在尝试使用 librdkafka c api 发送 json 有效负载。我现在想做的是

使用上面的代码,我设法将字节发送给代理。但是 json 有效负载似乎格式不正确。消费者(消费者是使用 Newtonsoft json lib 进行反序列化的 C# 客户端)抛出以下错误:

我不太清楚我的错误是在我构造 json 对象、将其编码为字符串的方式上,还是在我使用 librdkafka 发布 json 字符串的方式上。

0 投票
0 回答
484 浏览

node.js - 无法在 Apple M1 air 上构建节点 rdkafka

我试图在我的 m1 air 上运行 npm install node-rdkafka 但我收到了这个构建错误。

我安装了与 M1 芯片兼容的最新版本节点。我需要的所有其他节点包都安装得很好。我什至尝试使用自制软件安装 librdkafka 库,但它也没有帮助。我在这里做错了什么?

0 投票
1 回答
143 浏览

apache-kafka - 当代理列表中的第一个代理关闭时创建生产者

我有一个用于消费和生产的多节点 Kafka 集群。

在我的应用程序中,我使用 confluent-kafka-go(1.6.1) 创建生产者和消费者。当我生产和消费消息时,一切都很好。这就是我配置引导服务器列表的方式

但是当我开始给出经纪人的IP地址时bootstrap.servers,如果第一个经纪人出现故障,似乎生产者反复失败创建告诉

如果我删除故障节点的 IP,则生成和使用消息会起作用。如果在我创建生产者/消费者之后代理关闭,它们可以通过切换到其他节点继续使用。

我应该如何配置bootstrap.servers以使用可用节点创建生产者?

0 投票
1 回答
50 浏览

c++ - 如何使用 librdkafka C++ 客户端添加拦截器?

我正在尝试使用 librdkafka C++ 客户端编写一个 Kafka 生产者,我发现在 C++ 版本中没有添加自定义拦截器的 API,而我发现一些在 C 中添加拦截器的函数在 C++ 客户端中没有公开