问题标签 [sarama]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
go - 如何在 sarama 中获得多个订阅
有人知道如何使用 Sarama 在 Apache Kafka 中制作多订阅主题
我有一个简单的消费者,我需要订阅三个不同的主题
go - 如何在 Sarama 中使用组 ID
您可能知道,Sarama 的文档非常糟糕。没有什么是清楚的。我刚刚找到了这个例子:https ://godoc.org/github.com/Shopify/sarama#example-Consumer
它消耗消息。没关系。我不知道如何在消费时使用 GroupID。
如何在 Sarama 中设置组 ID?
docker - 无法使用 Sarama Golang 包创建 Kafka 生产者客户端-“客户端/元数据在获取元数据时从代理收到错误:EOF”
版本:GoLang 1.10.2 Kafka 4.4.1 Docker 18.03.1
我正在尝试使用 Shopify 的 Sarama 包来测试我的 Kafka 实例。我使用 Docker compose 来支持 Kafka/Zookeeper,它都可以成功运行。
当我尝试使用 Sarama 创建 Producer 客户端时,会引发错误。
当我运行以下
)
我明白了
[sarama] 2018/06/12 17:22:05 初始化新客户端
[sarama] 2018/06/12 17:22:05 客户端/元数据从代理 localhost:29092 获取所有主题的元数据
[sarama] 2018/06/12 17:22:05 在 localhost:29092 连接到代理(未注册)
[sarama] 2018/06/12 17:22:05 客户端/元数据在获取元数据时从代理收到错误:EOF
[sarama] 2018/06/12 17:22:05 关闭与代理 localhost:29092 的连接
{sarama] 2018/06/12 17:22:05 客户端/元数据没有可用的代理发送元数据请求
[sarama] 2018/06/12 17:22:06 Closing Client panic: kafka: client has run out of available brokers to talk(你的集群是否可达?)
goroutine 1 [运行]:main.main() /Users/benwornom/go/src/github.com/acstech/doppler-events/testprod/main.go:29 +0x3ec 退出状态 2
Sarama 确实连续多次尝试创建生产者客户端,但每次都失败了。
我对 Sarama 的“NewAsyncProducer”方法的理解是,它调用了“NewClient”,不管你是创建Producer 还是Consumer,都会调用它。NewClient 尝试从 Kafka 代理收集元数据,这在我的情况下失败了。我知道它正在连接到 Kafka 代理,但是一旦连接,它似乎就中断了。任何意见将是有益的。我的网络连接很强,我想不出任何干扰服务器的东西。据我所知,现有主题只有一个代理和一个分区。我认为我不必手动将主题分配给经纪人。如果我的客户正在与代理连接,为什么我不能为我的生产者建立持久连接?
这是来自 kafka 日志文件,就在它死之前。
__consumer_offsets-5 -> Vector(1), connect-offsets-23 -> Vector(1), __consumer_offsets-43 -> Vector(1), __consumer_offsets-32 -> Vector(1), __consumer_offsets-21 -> Vector(1) ), __consumer_offsets-10 -> Vector(1), connect-offsets-20 -> Vector(1), __consumer_offsets-37 -> Vector(1), connect-offsets-9 -> Vector(1), connect-status- 4 -> Vector(1),__consumer_offsets-48 -> Vector(1),__consumer_offsets-40 -> Vector(1),__consumer_offsets-29 -> Vector(1),__consumer_offsets-18 -> Vector(1),连接- offsets-14 -> Vector(1), __consumer_offsets-7 -> Vector(1), __consumer_offsets-34 -> Vector(1), __consumer_offsets-45 -> Vector(1), __consumer_offsets-23 -> Vector(1), connect-offsets-6 -> Vector(1),connect-status-1 -> Vector(1),connect-offsets-17 -> Vector(1),connect-offsets-0 -> Vector(1),connect-offsets-22 -> Vector(1), __consumer_offsets-26 -> Vector(1), connect-offsets-11 -> Vector(1), __consumer_offsets-15 -> Vector(1), __consumer_offsets-4 -> 向量(1), __consumer_offsets-42 -> Vector(1), __consumer_offsets-9 -> Vector(1), __consumer_offsets-31 -> Vector(1), __consumer_offsets-20 -> Vector(1), connect-offsets-3 - > Vector(1), __consumer_offsets-1 -> Vector(1), __consumer_offsets-12 -> Vector(1), connect-offsets-8 -> Vector(1), connect-offsets-19 -> Vector(1), connect-status-3 -> Vector(1), __confluent.support.metrics-0 -> Vector(1), __consumer_offsets-17 -> Vector(1), __consumer_offsets-28 -> Vector(1), __consumer_offsets-6 - > Vector(1), __consumer_offsets-39 -> Vector(1), __consumer_offsets-44 -> Vector(1), connect-offsets-16 -> Vector(1), connect-status-0 ->Vector(1), connect-offsets-5 -> Vector(1), connect-offsets-21 -> Vector(1), __consumer_offsets-47 -> Vector(1), __consumer_offsets-36 -> Vector(1), __consumer_offsets -14 -> Vector(1), __consumer_offsets-25 -> Vector(1), __consumer_offsets-3 -> Vector(1), __consumer_offsets-30 -> Vector(1), __consumer_offsets-41 -> Vector(1), 连接-offsets-13 -> Vector(1),connect-offsets-24 -> Vector(1),connect-offsets-2 -> Vector(1),connect-configs-0 -> Vector(1),__consumer_offsets-11 -> Vector(1), __consumer_offsets-22 -> Vector(1), __consumer_offsets-33 -> Vector(1), __consumer_offsets-0 -> Vector(1), connect-offsets-7 -> Vector(1), 连接-offsets-18 -> Vector(1))) (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,461]调试[控制器ID = 1]主题不在代理1 Map()的首选副本中(kafka.controller.KafkaController)[36mkafka_1 | [0m [2018-06-12 20:24:47,462]跟踪[控制器ID = 1 ] broker 1 的领导者不平衡比率为 0.0 (kafka.controller.KafkaController)
apache-kafka - Sarama 生产到压缩主题而不是压缩
高级别问题
我在本地运行 kafka,并且正在使用压缩主题。当我运行命令行生产者和消费者时,我可以验证正在发生压缩,但是当我使用 sarama(“github.com/Shopify/sarama”)生产者时,似乎没有发生日志压缩。
验证日志压缩
首先,我使用以下命令创建了一个主题:
接下来,我使用以下内容向它生成几条消息:
最后验证是否发生了日志压缩:
哪个打印:
因此,正在对 andrew.topic 主题进行日志压缩。
现在使用萨拉马
现在我使用 sarama 为同一主题生成消息,如下所示:
在命令行重新启动消费者后,我看到以下输出
此处未进行日志压缩。无论我重启消费者多少次,或者我使用 sarama 日志压缩生成多少消息,似乎都没有发生。
更怪异
如果在使用 sarama 生成消息之后,我会在命令行日志压缩中生成更多消息,然后发生
运行 sarama producer 后运行终端生产者后得到以下输出
在终端运行生产者后,所有消息都会发生日志压缩,包括之前由 sarama 生成的消息。
为什么会这样?我该如何解决?
go - 使用 sarama 编写 Kafka 生产者时时间戳无效
我有一个正在运行的 Kafka 实例(在本地,在 Docker 中),我使用sarama 包在 Go 中创建了一个生产者。
由于我想在我的主题上使用 Kafka Streams,生产者必须在消息中嵌入时间戳,否则我会收到以下丑陋的错误消息:
org.apache.kafka.streams.errors.StreamsException: 输入记录 ConsumerRecord(topic = crawler_events, partition = 0, offset = 0, CreateTime = -1, serialized key size = -1, serialized value size = 187, headers = RecordHeaders( headers = [], isReadOnly = false), key = null, value = {XXX}) 具有无效(负)时间戳。可能是因为使用了 0.10 之前的生产者客户端将这条记录写入 Kafka 而没有嵌入时间戳,或者因为输入主题是在将 Kafka 集群升级到 0.10+ 之前创建的。使用不同的 TimestampExtractor 来处理此数据。
这是在我的 Go 程序中发送消息的代码部分:
如您所见,我尝试发送的时间戳是time.Now()
.
当我运行控制台消费者查看收到的时间戳时:
我看到它们都是“-1”:
使用控制台生产者向主题添加消息时,我有预期的时间戳,例如:
我究竟做错了什么?谢谢你的帮助。
go - 包无法从分区消费
我无法从主题中消费,我不确定我的代码或我的 kafka 配置是否有问题。我遇到的问题是它卡在“开始”的打印语句上,所以它没有收到来自频道 <-partitionConsumer.Messages() 的消息。
这些是我为我的 kafka 设置 ( https://kafka.apache.org/quickstart ) 采取的步骤,其中包含一些消息,我确定它们存在,因为当我运行以下命令时,我看到了值。
bin/kafka-console-consumer.sh --partition 0 --topic test --bootstrap-server localhost:9092 --offset 最早
- bin/zookeeper-server-start.sh 配置/zookeeper.properties
- bin/kafka-server-start.sh 配置/server.properties
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
go - Golang Consumer 连接 Kafka 后接收 Kafka 消息延迟
我是 Golang 和 Kafa 的新手,所以这似乎是一个愚蠢的问题。
在我的 Kafka 消费者首次连接到 Kafka 服务器后,为什么在与 Kafka 服务器建立连接和接收到第一条消息之间会有延迟(约 20 秒)?
它在之前consumer.Messages()
打印一条消息,并为收到的每条消息打印另一条消息。~20 秒延迟介于 firstfmt.Println
和 second之间fmt.Println
。
码头工人-compose.yml
go - 我的 Golang 应用程序中是否需要一个或多个 sarama.SyncProducer?
我是Golang的新手,我需要编写将事件发布到kafka的应用程序,我找不到以下问题的答案:
- 我需要多少 sarama.SyncProducer?
- 在所有应用程序中使用一个可以吗?我应该有某种生产者池吗?
go - 如何使用 shopify sarama 通过抵消来处理消费者恢复
我读过 kafka 提供了一个消费者客户端库,该库允许通过保存在 zookeeper 中读取的最后一个偏移量来恢复(不是 100% 确定它的存储位置)。
是否可以对 Sarama 消费者做同样的事情?
假设我正在阅读直到偏移量 550,我的消费者崩溃了 5 分钟,我们现在处于偏移量 700,但我想从偏移量 550 恢复消费。
不必自己拯救国家,这可能吗?我会假设它确实如此,但我不明白如何。
我找到sarama.OffsetNewest/Oldest
了,但这不是我要找的...
go - Kafka:客户端已用完可用的代理
更新:原来我在 Docker 中的端口有问题。不知道为什么解决了这种现象。
我相信我遇到了一个奇怪的错误。我正在使用Sarama库,并且能够成功创建消费者。
一旦我将这段代码分解并移出主程序,我就会遇到错误:
kafka:客户端已经用完了可以与之交谈的代理(您的集群是否可达?)
我已将代码拆分如下:以前的 main() 方法现在已转换为使用名为 NewConsumer() 的方法的消费者包,并且我的新 main() 调用 NewConsumer() 如下所示:
恐慌声明被触发sarama.NewConsumer
并打印出来kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
为什么以这种方式分解我的代码会触发 Sarama 无法成为消费者?Sarama 是否需要直接从 main 运行?