问题标签 [sarama]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
go - 使用 ConsumerGroup 的 Sarama 分区消费者
我曾尝试使用 shopify/sarama 库来使用 kafka 消息。我使用了Consumer
接口和ConsumerGroup
接口。我可以使用ConsumePartition()
. Consumer
但是当我使用ConsumerGroup
接口时,我似乎没有能力从特定分区消费。
有没有办法让我将某些分区分配给消费者组中的特定消费者?还是我无法干预的事情?
go - kafka sarama lib如何知道一个cosumergroup中有多少cosumer
我在这里阅读了消费者组示例(由 Shopify/sarama 提供),我想知道我如何知道一个消费者组中有多少消费者,有什么方法可以控制消费者的数量吗?
非常感谢
go - Sarama 无法为 Amazon MSK 版本 2.3.1 生成消息
我正在使用sarama golang 库将消息推送到Amazon MSK。到目前为止,我使用的是 msk 版本 2.2.1,我的代码运行良好,但现在 msk 版本已更改为 2.3.1。现在,我无法将消息推送到主题。
错误:
分区-1
偏移量 -1
请求的主题或分区在此代理上不存在。
代码:
我也将 sarama 版本更改为 maxVersion config.Version = sarama.MaxVersion
,但它不适用于 Amazon MSK 2.3.1。
请提供一些解决方案。
go - 消息不以魔术字节开头
我正在尝试使用Go 中的/linkedin/goavro包将 avro 编码数据生成到 kafka 主题中。目标是能够使用不同的客户端来使用主题。
首先,我将架构注册如下:
然后我创建 avro 数据,使用 Go 生成和使用它。
此代码工作正常,我可以成功地在 kafka 主题中生成和使用消息。
现在我尝试使用来自 python avro-consumer 的主题:
但我收到以下错误:
我认为我在 Go 生产者部分遗漏了一些东西,如果有人能分享他/她关于如何解决这个问题的经验,我将不胜感激。
docker - docker-compose kafka - 本地机器客户端无法向 kafka 生成消息
我已经阅读了很多类似的主题,但他们无法在这里回答我的问题。
尝试运行一些简短的集成测试,我正在使用 docker-compose 3,一个单节点 kafka。在客户端,我使用 Go shopify/sarama 来消费/生产
我有另一个来自 docker-compose 的容器,它将收听
- "BROKERS_URL=kafka:9092"
消费者工作得很好:
Sarama 消费者启动并运行。{“经纪人”:[“kafka:9092”],“主题”:[“已验证”],“组”:“事件服务”}
但在生产者部分,直接从我的机器上运行:
kafka:客户端已经用完了可以与之交谈的代理(您的集群是否可达?)
这里没有什么奇怪/奢侈的,但它不起作用,我很困惑。
还:
nc -vz localhost 29092
连接到 localhost 端口 29092 [tcp/*] 成功!
go - 拨号tcp:查找ip-xx-xx.ec2.internal:没有这样的主机
我正在使用 github.com/Shopify/sarama 包与 Kafka 进行交互。在我目前的方法中,我可以连接到代理并毫无问题地获取所有主题名称(下面的消费者代码)。
但是,当我尝试使用管理客户端(下面的管理代码)删除一些主题时,我收到“dial tcp:lookup ip-xx-xx.ec2.internal:no such host”错误。
我不知道为什么会收到此错误。我将不胜感激任何提示或可能的解决方案。
消费者
行政
注意我通过转发 ssh 端口将堡垒实例连接到远程。
更新
设置后sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
我得到以下信息:
更新 2
我的卡夫卡 server.properties:
go - Kafka Preferred 领导者是否会不断更新,即使经纪人都已启动?
我正在尝试制作一个 kafka 监控服务,它需要在集群中的每个代理上至少有一个主题分区。我最初分配了分区。我已将复制因子设置为 1,min.insync.replicas 也是 1,并且我正在使用带有 acks = all 的同步生产者进行生产。
分区代理分配最初看起来像这样
当我尝试杀死一个代理(例如代理 2)时,其领导者是该代理的分区应该并且正在报告“No Leader”错误。现在因为我有 1 个复制并且只有领导者是同步副本,所以 kafka 不会重新选举任何其他代理作为领导者,这是预期的,直到这里一切都很好。
此时分区代理分配看起来像,
问题:但是当该代理恢复时,分区的领导者和首选领导者都会更新为其他一些代理。我不明白这一点,也不是预期的。那么,我是否缺少任何配置,或者有没有人遇到过这个问题?
分区代理分配是这样的。为什么??
go - 使用 go sarama 库获取 kafka 指标
我一直在尝试使用 go sarama 库获取 kafka 代理指标,但我没有什么顾虑。
我使用 sarama.NewConfig() 初始化了一个 sarama 配置,版本为 sarama.V2_0_0_0 ,我使用这个配置来获取他们文档中提到的指标
这是否提供包括所有传入消息在内的总传入字节率?因为我将此值与镜头和其他工具进行了比较,但它完全不同(“传出字节率”和“请求率”相同。我想这个请求率意味着消息率)。
与
RateMean()
函数不同,Rate1()
,Rate5()
和Rate15()
只返回 0 尽管在 kafka 集群中有很多活动。你知道这是什么原因吗?我曾经获得文档
metrics.GetOrRegisterMeter(fmt.Sprintf("incoming-byte-rate-for-broker-%v", broker.ID()), config.MetricRegistry).RateMean()
中提到的经纪人级别指标, 但我只收到 0。
如果有人可以在这里帮助我并启发我解决这些问题,我将非常感激。提前致谢。:)
go - 在 goroutine 中扭曲 sarama-cluster 消费动作,然后它无法消费任何东西
我正在使用sarama-cluster
lib 在后端服务中创建一个 kafka 组消费者。这个来自godoc的示例代码有效:
因为它是一个死循环,所以我把它放在一个 goroutine 中以避免阻塞其他活动,那么它就不能再消费任何消息:
(服务正在运行,所以这个 goroutine 没有被终止。它只是消费失败)
知道如何解决这个问题吗?
go - Kafka Consumer:如何以编程方式从 Go Sarama 中的特定偏移量消费
最近,我开始学习使用kafka工作。我正在从事的项目使用sarama。
为了阅读我使用的消息ConsumerGroup
。
foo
如果返回,我需要在一段时间后再次阅读该消息false
。如何才能做到这一点?