问题标签 [sarama]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2160 浏览

go - 使用 ConsumerGroup 的 Sarama 分区消费者

我曾尝试使用 shopify/sarama 库来使用 kafka 消息。我使用了Consumer接口和ConsumerGroup接口。我可以使用ConsumePartition(). Consumer但是当我使用ConsumerGroup接口时,我似乎没有能力从特定分区消费。

有没有办法让我将某些分区分配给消费者组中的特定消费者?还是我无法干预的事情?

0 投票
1 回答
113 浏览

go - kafka sarama lib如何知道一个cosumergroup中有多少cosumer

我在这里阅读了消费者组示例(由 Shopify/sarama 提供),我想知道我如何知道一个消费者组中有多少消费者,有什么方法可以控制消费者的数量吗?

非常感谢

0 投票
1 回答
508 浏览

go - Sarama 无法为 Amazon MSK 版本 2.3.1 生成消息

我正在使用sarama golang 库将消息推送到Amazon MSK。到目前为止,我使用的是 msk 版本 2.2.1,我的代码运行良好,但现在 msk 版本已更改为 2.3.1。现在,我无法将消息推送到主题。

错误:

分区-1

偏移量 -1

请求的主题或分区在此代理上不存在。

代码:

我也将 sarama 版本更改为 maxVersion config.Version = sarama.MaxVersion,但它不适用于 Amazon MSK 2.3.1。

请提供一些解决方案。

0 投票
1 回答
1128 浏览

go - 消息不以魔术字节开头

我正在尝试使用Go 中的/linkedin/goavro包将 avro 编码数据生成到 kafka 主题中。目标是能够使用不同的客户端来使用主题。

首先,我将架构注册如下:

然后我创建 avro 数据,使用 Go 生成和使用它。

此代码工作正常,我可以成功地在 kafka 主题中生成和使用消息。

现在我尝试使用来自 python avro-consumer 的主题:

但我收到以下错误:

我认为我在 Go 生产者部分遗漏了一些东西,如果有人能分享他/她关于如何解决这个问题的经验,我将不胜感激。

0 投票
1 回答
815 浏览

docker - docker-compose kafka - 本地机器客户端无法向 kafka 生成消息

我已经阅读了很多类似的主题,但他们无法在这里回答我的问题。

尝试运行一些简短的集成测试,我正在使用 docker-compose 3,一个单节点 kafka。在客户端,我使用 Go shopify/sarama 来消费/生产

我有另一个来自 docker-compose 的容器,它将收听

- "BROKERS_URL=kafka:9092"

消费者工作得很好:

Sarama 消费者启动并运行。{“经纪人”:[“kafka:9092”],“主题”:[“已验证”],“组”:“事件服务”}

但在生产者部分,直接从我的机器上运行:

kafka:客户端已经用完了可以与之交谈的代理(您的集群是否可达?)

这里没有什么奇怪/奢侈的,但它不起作用,我很困惑。

还: nc -vz localhost 29092

连接到 localhost 端口 29092 [tcp/*] 成功!

0 投票
2 回答
5696 浏览

go - 拨号tcp:查找ip-xx-xx.ec2.internal:没有这样的主机

我正在使用 github.com/Shopify/sarama 包与 Kafka 进行交互。在我目前的方法中,我可以连接到代理并毫无问题地获取所有主题名称(下面的消费者代码)

但是,当我尝试使用管理客户端(下面的管理代码)删除一些主题时,我收到“dial tcp:lookup ip-xx-xx.ec2.internal:no such host”错误。

我不知道为什么会收到此错误。我将不胜感激任何提示或可能的解决方案。

消费者

行政

注意我通过转发 ssh 端口将堡垒实例连接到远程。


更新

设置后sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)我得到以下信息:


更新 2

我的卡夫卡 server.properties:

0 投票
1 回答
121 浏览

go - Kafka Preferred 领导者是否会不断更新,即使经纪人都已启动?

我正在尝试制作一个 kafka 监控服务,它需要在集群中的每个代理上至少有一个主题分区。我最初分配了分区。我已将复制因子设置为 1,min.insync.replicas 也是 1,并且我正在使用带有 acks = all 的同步生产者进行生产。

分区代理分配最初看起来像这样

当我尝试杀死一个代理(例如代理 2)时,其领导者是该代理的分区应该并且正在报告“No Leader”错误。现在因为我有 1 个复制并且只有领导者是同步副本,所以 kafka 不会重新选举任何其他代理作为领导者,这是预期的,直到这里一切都很好。

此时分区代理分配看起来像,

问题:但是当该代理恢复时,分区的领导者和首选领导者都会更新为其他一些代理。我不明白这一点,也不是预期的。那么,我是否缺少任何配置,或者有没有人遇到过这个问题?

分区代理分配是这样的。为什么??

0 投票
0 回答
271 浏览

go - 使用 go sarama 库获取 kafka 指标

我一直在尝试使用 go sarama 库获取 kafka 代理指标,但我没有什么顾虑。

我使用 sarama.NewConfig() 初始化了一个 sarama 配置,版本为 sarama.V2_0_0_0 ,我使用这个配置来获取他们文档中提到的指标

  1. 这是否提供包括所有传入消息在内的总传入字节率?因为我将此值与镜头和其他工具进行了比较,但它完全不同(“传出字节率”和“请求率”相同。我想这个请求率意味着消息率)。

  2. RateMean()函数不同,Rate1(),Rate5()Rate15()只返回 0 尽管在 kafka 集群中有很多活动。你知道这是什么原因吗?

  3. 我曾经获得文档metrics.GetOrRegisterMeter(fmt.Sprintf("incoming-byte-rate-for-broker-%v", broker.ID()), config.MetricRegistry).RateMean()中提到的经纪人级别指标, 但我只收到 0。

如果有人可以在这里帮助我并启发我解决这些问题,我将非常感激。提前致谢。:)

0 投票
1 回答
262 浏览

go - 在 goroutine 中扭曲 sarama-cluster 消费动作,然后它无法消费任何东西

我正在使用sarama-clusterlib 在后端服务中创建一个 kafka 组消费者。这个来自godoc的示例代码有效:

因为它是一个死循环,所以我把它放在一个 goroutine 中以避免阻塞其他活动,那么它就不能再消费任何消息:

(服务正在运行,所以这个 goroutine 没有被终止。它只是消费失败)

知道如何解决这个问题吗?

0 投票
1 回答
1497 浏览

go - Kafka Consumer:如何以编程方式从 Go Sarama 中的特定偏移量消费

最近,我开始学习使用kafka工作。我正在从事的项目使用sarama

为了阅读我使用的消息ConsumerGroup

foo如果返回,我需要在一段时间后再次阅读该消息false。如何才能做到这一点?