问题标签 [sarama]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
634 浏览

docker - 如何修复 Golang Sarama 中的 ConsumePartition

我正在使用 Kafka 和 Golang 进行测试

我在用着:

码头工人: https ://hub.docker.com/r/bitnami/kafka

萨拉马: https ://github.com/Shopify/sarama

例子很简单,就是一个连接Kafka的Consumer: https ://godoc.org/github.com/Shopify/sarama#example-Consumer

代码是这样的:

但是在执行时: go run main.go

它向我显示以下错误:

回购在这里: https ://github.com/hectorgool/kafka1/blob/master/main.go#L25

是的,我知道我缺少消息的生产者,但奇怪的是:consumer.ConsumePartition 不起作用

0 投票
1 回答
144 浏览

string - 如何使用 map[string]*string

我正在尝试使用 sarama(管理员模式)来创建主题。没有 ConfigEntries 工作正常。但我需要定义一些配置。

我设置了主题配置(这里发生了错误):

但后来我得到一个错误:

我正在尝试像这样使用管理员模式:

这是定义 CreateTopic() https://github.com/Shopify/sarama/blob/master/admin.go#L18的 sarama 模块中的行

基本上,我不明白指针字符串的映射是如何工作的:)

0 投票
1 回答
1143 浏览

go - 重复消费 Kafka 消息的可能原因

昨天我从日志中发现,在 Kafka 组协调器启动组重新平衡后,kafka 正在重新消费一些消息。这些消息已在两天前被消费(从日志中确认)。

日志中报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次重新平衡会导致重复消费消息呢?有什么问题?

我正在使用 golang kafka 客户端。这是代码

我们在声明消息之前处理消息,所以似乎我们正在使用 kafka 的“至少发送一次”策略。我们在一台机器上有三个代理,而在另一台机器上只有一个消费者线程(goroutine)。

对此现象有何解释?我认为这些消息一定已经提交了,因为它们在两天前被消费了,或者为什么 kafka 会在不提交的情况下保持偏移量超过两天?

使用代码示例:

添加:

  1. 应用程序重新启动后发生重新平衡。还有另外两次重启并没有导致重新消费

  2. 卡夫卡的配置

    log.retention.check.interval.ms=300000
    log.retention.hours=168
    zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms
    =0
    delete.topic.enable = true
    auto.create.topics .enable=false

0 投票
1 回答
5289 浏览

go - 如何使用 Sarama 在多个 goroutines 中消费 Kafka 主题?

我使用https://github.com/Shopify/sarama与 Kafka 进行交互。我有一个主题,例如100 个分区。我有应用程序,它部署在1 个主机上。所以,我想在多个 goroutines 中使用这个主题。

我看到了这个例子 - https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go,我们可以在其中看到如何在特定的消费者组中创建消费者。

所以,我的问题是,我应该创建多个这样的消费者,还是有一些设置Sarama,我可以在其中设置所需数量的消费者 goroutine。

PS我看到这个问题 - https://github.com/Shopify/sarama/issues/140 - 但没有答案,如何创建MultiConsumer

0 投票
1 回答
3827 浏览

go - 如何使用 Sarama Go Kafka Consumer 从最新的偏移量中消费

我有三个问题:</p>

  1. “最旧的偏移量”是什么意思?最旧的偏移量并不意味着偏移量 0?

// OffsetOldest 代表代理上可用的最旧偏移量
// 分区。
OffsetOldest int64 = -2

  1. 假如

    A. 三个代理在同一台机器上运行
    B. 消费者组只有一个消费者线程
    C. 消费者配置 OffsetOldest 标志。
    D. 已经产生了 100 条消息,目前消费者线程已经消耗了 90 条消息。

    那么如果consumer线程重启了,那么这个consumer会从哪个offset开始消费呢?是 91 还是 0?

  2. 在我们下面的代码中,似乎每次启动消费者时都会重新使用消息。但实际上它并不总是发生。为什么重新启动后只会发生几次重复消费(不是全部)?

    /li>
0 投票
1 回答
733 浏览

go - 如何从头到尾消费消息到特定的偏移量

我有kafka中给定主题的分区号和偏移量之间的映射。我想使用 golang 从一开始就使用所有消息到那些特定的分区/偏移映射。简而言之,我想知道是否可以在 kafka 中从头到尾使用消息到特定的偏移量。

我想知道是否有内置工具或库可以帮助我在开始实现某些东西之前做。

0 投票
0 回答
230 浏览

apache-kafka - kafka分区块上的消息是否可能?

我正在使用 MaxProcessingTime 为 250 毫秒的 sarama kafka 消费者。当我发送约 400000 字节的大消息时,分区被阻塞,即我的 kafka 消费者停止从该分区读取,并且该分区上的延迟不断增加。这可能是因为 MaxProcessingTime 值吗?

0 投票
0 回答
516 浏览

go - 如何通过 Go 创建 Kafka 的 Sasl Scram 用户主体?

现在我正在制作一个 kafka 服务代理,并希望在使用 golang 进行配置时创建一个新用户。我检查了 Go 的所有 kafka 库,但没有找到任何方法来创建用户(主体)。似乎 Shopify/Sarama 下面的两种方法可能是可行的,但我不知道如何使用它,也找不到示例代码。

我想做的是: bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=myusername-secret],SCRAM-SHA-512=[password=myusername-secret]' --entity-type users --entity-name myusername

是否可以调用 kafka-rest-api 来创建用户?什么是最好的解决方案?

0 投票
1 回答
161 浏览

go - 当消费者尝试连接到停机代理时,Sarama 库中会发生什么?

当消费者尝试连接到停机代理时,Sarama 库中会发生什么?它会返回错误吗?

0 投票
1 回答
4267 浏览

go - Sarama Kafka 消费者组函数返回

我对 Go Lang 非常陌生,并试图对使用 Sarama 库从 Kafka 消费消息的开源库进行一些调整。原始代码可以在这里找到。

原始包实现了一个 PartitionConsumer,如果不需要跨多个消费者使用同一主题的读取一致性,它就可以正常工作,但是,这对我不起作用。

我在同一个应用程序中做了一些工作,使用我在网上找到的一些示例来实现 sarama NewConsumerGroup 包。

以下是我目前正在运行的代码:

KafkaConfig 为消费者携带 groupID 和 Topic。当我运行这个程序时,消费者启动并使用正确的组从正确的主题中读取,并使用在此函数中创建的 ConsumerClaim 将其打印到 STDOUT:

然而,我认为我需要的是NewKafkaInput函数返回*KafkaInput添加到结构的声明中的消息(如果我在这里使用了错误的术语,请原谅我,这是我的第一个 Go 牛仔竞技表演)。

在此处完成的原始示例中:

我花了好几天的时间来研究将函数移入和移出NewKafakInput函数,尝试将消息添加到KafakInput函数外部的结构以及介于两者之间的所有内容。我只是无法让它工作。该NewKafakInput函数需要返回*KafkaInput任何消息,以便此函数可以完成:

完全有可能我也把这件事弄得一团糟,但感谢任何帮助和输入。

谢谢