问题标签 [sarama]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka-sarama:生产者是否应该在空闲超时后自动重新连接?
我有一个关于 sarama Go 语言 Kafka 库的看似简单的问题:当 Kafka 代理由于空闲超时(由connections.max.idle.ms
配置控制)而关闭时,生产者是否应该在/通过发送新消息时自动重新建立与代理的连接消息与否?
我无法在我检查的任何资源中找到答案:
我问是因为它似乎没有自动重新连接 - 现在我应该尝试避免超时(通过增加connections.max.idle.ms
配置或发送心跳消息)还是应该提出 sarama 的问题?
我还查看了 sarama 日志以获取有关此处实际发生的情况的线索(测试场景,每 10 分钟发送一次消息)但是,对于 Kafka 和 Go 来说都是新手,我无法从中获得太多收益:
apache-kafka - 卡夫卡消费者群体滞后:整个案例问题的初始和低
用例:
- 生产者:在主题上写给 Kafka
db.inventory.customers
- ConsumerGroup1 (cg1):读取
db.inventory.customers
和写入loader-b.inventory.customers
- ConsumerGroup2 (cg2):读取
loader-b.inventory.customers
和写入 Github。
监控滞后并做一些工作
我们监控 cg1 滞后和 cg2 滞后。当延迟在0 <= lag <= 100
两个消费者组的范围内时,我们执行一些任务。
问题
问题是对于吞吐量低的加载器主题,cg2 消失了,所以我们不知道它的滞后并将其视为 -1。我们的条件从未得到满足,我们被困住了。
现在,如果我们考虑
0 <= lag <= 100
cg1 和-1 <= lag <= 100
cg2的条件
然后,在没有创建 cg2 的第一次运行中,它会考虑满足条件。但我们不希望这样。我们希望它做一些工作,然后滞后应该达到条件。
我应该怎么办?
代码
go - Sarama 并未订阅所有使处理陷入困境的主题。是不是因为某些配置?
我们的消费者组正在处理 100 多个主题(所有主题只有一个分区,所有 100 个主题的分区 0)
例如,在加载程序处理程序中批量处理。它根据消息计数和时间进行批处理和处理。循环继续进入股票行情,并且由于没有插入,批次大小 = 0,因此没有任何内容得到处理。我们陷入了这个循环。
在 100 多个主题中,只有 57 个主题收到了消息added subscription to
,其余 43 个主题从未被订阅,因此它们陷入了无限循环,等待消息进入阅读频道。
请建议这是否是预期的行为,是否由于某些并发限制? https://github.com/Shopify/sarama/issues/1897
go - 在 Go 中使用 Uber-Zap 记录器将指定的日志发送到 Kafka 接收器
我正在尝试使用zap logger包来创建带有文件、控制台和 Kafka 接收器的核心。我有一些非常特定INFO
级别的日志,我想将它们发送到 Kafka 主题以供下游消费者处理。但是,通过当前的实现,我INFO
在 Kafka 主题中获得了所有级别的日志,甚至是我不想要的日志。
有没有办法使用一个通用的 zap 记录器对象来防止同一级别的不需要的日志不去任何一个特定的接收器?
下面是我用来创建单个记录器对象的函数。
我正在使用Sarama包来实现 Kafka 生产者。我还考虑过使用自定义日志记录级别。但是,zap v1.0 不支持它。
go - 使用 Shopify/sarama 重新启动程序时从 kafka 获取最新偏移量
对不起,我有一个关于使用 Shopify/sarama 的 kafka 的问题。
- 我必须在内部实现 func
ConsumeClaim
和循环才能获取消息。ConsumeClaim
在 goroutine 中调用,每个分区都有自己的 goroutine。如果我使用的主题有 5 个部分,那么它将生成 5 个 goroutine 。-> 我可以只做 3 而不是 5 goroutine 而不改变分区吗?(使用缓冲区等...) - 当我重新启动程序时,它会在我停止程序之前从第一个偏移量而不是从当前偏移量获取。那么,如何在停止之前从最新的偏移量中获取消息?
go - 如何在 kubernetes 部署中扩展 sarama 消费者组?
我正在尝试让一些消费者处理来自 kafka 的消息,并且我想实现 kubernetes 部署可扩展性以实现弹性消息处理能力。
我从 sarama 官方指南https://pkg.go.dev/github.com/Shopify/sarama#NewConsumerGroup中找到了这段代码:
我有一些问题:
- 如何设置消费组中的消费者数量?
- 如果我在 Pod 中部署这个程序,我可以安全地扩展它吗?我的意思是,假设一个程序正在运行,并且我将副本从 1 扩展到 2,
NewConsumerGroup
那么具有相同组 id 的另一个调用是否可以完美地工作而不会发生冲突?
先感谢您。
注意:我使用的是 Kafka 2.8,听说 sarama_cluster 包已弃用。
go - 有没有其他方法可以在没有 sarama 的代理地址的情况下访问 kafka 集群?
如果不知道代理地址,有没有办法开始与 Sarama 中的 Kafka 集群交互?如果连接集群的broker宕机了,如何重新连接?
go - Kafka:Sarama、幂等性和 transactional.id
Shopify /sarama是否提供类似于transactional.id
JVM API 的选项?
该库支持幂等性(Config.Producer.Idemponent
,类似于enable.idempotence
),但我不明白如何在没有transactional.id
.
如果我错了,请纠正我,Sarama 中缺少关于这些选项的文档。但是根据 JVM 文档,没有标识符的幂等性将受到单个生产者会话的限制。换句话说,当生产者失败并重新启动时,我们将失去保证。
我在源代码和一些测试(例如)中找到了相关属性,但不明白如何在外部使用它们。
go - 我如何使用 sarama 获取 kafka 生产者/消费者指标
我正在尝试使用 sarama 获取 kafka 生产者/消费者指标。但我找不到任何关于如何做同样事情的例子。有人可以提供一个示例实现示例吗?
我正在使用以下代码来获取经纪人的指标。但是获取生产者/消费者指标的配置应该是什么。我假设它不会一样。如我错了请纠正我
go - Golang 从 Kafka 的一个主题中删除所有记录
嗨,我正在 Go 和 Kafka 中编写服务,我需要实现一个删除所有端点,该端点将从特定主题中删除所有记录。但是我找不到合适的方法来做到这一点。我正在为 Kafka 使用Sarama 库。
到目前为止,我能找到实现全部删除的唯一两种方法是删除似乎不是处理此问题的有效方法的主题,第二种方法是使用DeleteRecords
Sarama 库中的函数,但是此函数删除其记录偏移量小于对应分区的给定偏移量。这意味着我必须首先获得最新的偏移量。
基本上我正在寻找做这件事的最佳方法。有人可以帮我吗?最佳实践是什么?也许我错过了一些东西。我真的很感激一些例子。谢谢!