问题标签 [sarama]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3258 浏览

go - 如何设置消费者从 Golang Kafka 10 中的特定偏移量开始

我的需要是让生产者从它崩溃前处理的最后一条消息开始。幸运的是,我只有一个主题,一个分区和一个消费者。

为此,我尝试了https://github.com/Shopify/sarama,但它似乎还不可用。我现在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。

我无法检索最后提交的偏移量我无法弄清楚如何让 sarama消费者从所述偏移量开始。到目前为止,我发现的唯一参数是Config.Producer.Offsets.Initial.

  1. 如何检索最后提交的偏移量?
  2. 如何让消费者从最后一条提交了偏移量的消息开始?OffsetNewest将使它从产生的最后一条消息开始,而不是消费者最后处理的消息。
  3. 是否可以仅使用 Shopify/sarama 而不是 bsm/sarama-cluster 这样做?

预先感谢

PS我使用的是Kafka 10.0,所以偏移量是存储在kafka而不是zookeeper中。

EDIT1: 部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到我们找到未处理的消息。

0 投票
2 回答
983 浏览

go - 如何在 Golang Kafka 10 中获取 GroupID?

我正在使用 Kafka 10.0 和https://github.com/Shopify/sarama。我正在尝试获取消费者处理的最新消息的偏移量。

为此,我找到了需要组名的方法NewOffsetManagerFromClient(group string, client Client)

如何获取消费者组名称?

我创建了一个消费者

但我不知道如何创建消费者组并获取其组名。

0 投票
2 回答
12900 浏览

go - 如何在 Golang 中创建一个 kafka 消费组?

一个可用的库是sarama(或其扩展sarama-cluster),但是没有提供消费者组示例,既不在sarama也不在sarama-cluster中。

我不明白 API。我可以举一个为主题创建消费者组的示例吗?

0 投票
3 回答
5235 浏览

sarama - 在 sarama 中创建 Kafka 主题

是否可以在 sarama 中创建 kafka 主题?我知道 java API 可以让你创建主题,但我找不到任何关于如何在 sarama 中创建主题的信息。如果可能的话,我应该使用哪个api的示例或解释将非常感谢

0 投票
2 回答
2072 浏览

go - 为什么 Shopify Sarama 消费者需要分区来消费消息

很抱歉发布与 Kafka 图书馆相关的问题,因为没有多少人对图书馆的特定问题感兴趣。但是这个库是 golang-Kafka 实现中最常用的库之一。

我想使用监听主题的 Sarama 库创建一个简单的消费者。现在据我所知,在高级 Kafka API 中,默认情况下,如果未指定特定分区,消费者会监听所有主题分区。但是,在这个库中,Consumer 接口只有 ConsumePartition 函数,其中需要分区参数。函数的签名是:

这让我有点困惑。有谁在这方面工作过?

另外,我有一个关于 Kafka 的基本问题。如果我有一个由 3 个消费者实例组成的消费者组,并且他们正在收听假设 2 个主题,每个主题有 2 个分区,那么我是否需要特别提及哪个消费者实例将消费到哪个分区或 Kafka Fetch API 将处理它它自己基于负载?

0 投票
1 回答
1161 浏览

apache-kafka - 当我在 Kafka 的一个分区上有多个主题时,偏移如何工作?

我正在努力更好地理解 Kafka 的工作原理。为简单起见,目前我在一个 Zookeeper 上运行 Kafka,它有 3 个代理和一个重复因子为 3 的分区。我了解到,一般来说,最好有分区数 ~= 消费者数。

问题 1:主题是否在同一个分区中共享偏移量?

我在一个分区(例如分区 0)上有多个主题(例如dogs, cats, dinosaurs)。现在我的制作人已经为每个主题制作了一条消息。"msg: bark"dogs"msg: meow"cats"msg: rawr"dinosaurs。我注意到,如果我指定dogs[0][0],我会返回,如果我在和bark上执行相同操作,我会分别返回每条消息。这是一个很棒的功能,但与我的理解相矛盾。我认为偏移量是特定于分区的。如果我按顺序将三条消息推送到一个分区中。消息不应该用 0、1 和 2 来索引吗?现在看来,偏移量是特定于某个主题的。catsdinosaurs

这就是我想象的样子

实际上,它看起来像这样

但那不可能。必须有一些东西来跟踪偏移量和消息在日志文件中的实际物理位置。

问题 2:如果您要为一个主题设置多个分区,您如何管理您的消息?

在问题 1 中,我在一个分区中有多个主题,现在假设我为一个主题有多个分区。例如,我有 4 个dogs主题分区,我有 100 条消息要推送到我的 Kafka 集群。我是否将消息均匀分布在分区中,例如 25 进入分区 1,25 进入分区 2 等等......?

如果消费者想要一次消费所有这 100 条消息,他/她需要访问所有四个分区。这与用 100 条消息击中 1 个分区有何不同?网络带宽是否会造成瓶颈?

先感谢您

0 投票
1 回答
1296 浏览

go - Kafka 0.11 / Golang Sarama 版本支持

我花了一些时间发现我的 Go 应用程序连接到 Kafka 0.11 集群正在使用旧的 0.8.2 版本的库,它在响应中缺少 Timestamp 值。

然后我发现不支持 Kafka 0.11.x API/版本(但他们正在努力)。

我现在有两个解决方案。

首先是在我的应用程序中明确设置所需的版本。其次是“调整” Sarama 代码以使用 0.10.x 版本作为最低版本,使我能够使用所有 0.10.x API/功能。

我仍然想知道为什么该版本不是从我正在连接的 Kafka 代理中获取的?

我无法从代码中理解它应该如何工作......我清楚地看到在 sarama.Config.Version 中设置或定义的版本,但是一旦连接到代理,我就找不到任何更新这个值的东西?

我知道 Python 是这样做的:

(0, 11, 0)

我错过了什么?

0 投票
0 回答
221 浏览

go - 将消费者从基于 zk 的偏移存储升级到基于 kafka 的存储

我正在使用 Golang 和 Sarama 客户端。Kafka 版本是 0.9,我计划升级。

我计划将 sarama 客户端升级到最新版本并使用 sarama-cluster 而不是 wvanbergen/kafka。我看到偏移量现在将提交给 kafka。

在 Apache Kafka 页面上,它说要从基于 zk 的存储迁移到 kafka,您需要执行以下操作:在您的使用者配置中设置 offsets.storage=kafka 和 dual.commit.enabled=true。

wvanbergen/kafka 库中没有这样的属性,他们也没有计划添加它。

有没有人在生产系统上没有 dual.commit.enabled 设置的情况下从 wvanbergen/kafka 到 sarama-cluster 进行了类似的升级?你是如何将偏移量从 zk 迁移到 kafka 的?

0 投票
2 回答
1945 浏览

apache-kafka - 如何通知消费者在 Kafka 中创建了新主题?

我试图让我的消费者动态更新其消费。

让我给你一个使用动物的更具体的例子。想象一下,我有一家宠物店,每个主题都是一种动物(例如狗、猫、鱼)。我的 Kafka 消费者的主要职责是获取我们在 Kafka 中拥有的任何日志/记录/消息,并将它们存储到数据库中。

假设我的消费者正在积极消费主题dogs并且cats一切正常,现在有一种新型动物进入商店,并且在 Kafka 集群中生成了一个新主题。如何通知我的消费者添加了新主题?

我有两个建议,我想看看你认为哪个更好?或者如果有更好的第三种选择,请告诉我。

1.) 生产者向消费者发送一个http请求,让消费者知道生产者将要创建一个新主题,以便消费者采取相应的行动。这种方法的问题在于,存在竞争条件。消费者有可能在创建主题之前尝试消费。(实际上我刚刚发现,如果我auto.topic.creation.enable设置为 true,那么竞态条件就不是问题了。)

topic_updates2.)在 Kafka 集群中创建一个额外的主题。因此,每当生产者成功向 Kafka 集群提交消息时,它都会通过 this 广播消息topic_updates,也许一个简单的字符串就可以了。消费者正在积极收听此主题更新。

3.) 我不知道,理想情况下,我希望 Kafka 能够在创建新主题时发出事件。

先感谢您

0 投票
0 回答
538 浏览

go - Kafka Consumer 升级后不消费消息

我使用 Go 并将服务器升级到 kafka 0.11,并将 sarama 客户端升级到 v1.13.0。我现在也在使用 sarama-cluster 而不是 wvanbergen/kafka 进行消费者组管理。请注意,kafka 升级适用于 wvanbergen/kafka 和旧版本的 sarama 客户端(v1.8)

我看到我的消费者没有消费任何消息,并且日志中有一行表明消费者组已死。

这是卡夫卡日志:

消费者或任何其他日志中没有错误。什么可能导致此问题?