问题标签 [sarama]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
go - 在 sarama-cluster 中模拟 NewConsumer
有没有办法在没有设置实际代理的情况下测试/模拟 sarama-cluster 的 NewConsumer 功能?我在这里想念什么?
我正在尝试测试的代码:
go - 无法使用 Golang Sarama 包从本地运行的 Kafka 服务器消费消息
我正在制作一个简单的 Telegram 机器人,它会从本地 Kafka 服务器读取消息并将其打印到聊天中。zookeeper 和 kafka 服务器配置文件都是默认的。控制台消费者工作。当我尝试使用 Golang Sarama 包从代码中使用消息时,问题就出现了。在我添加这些行之前:
case err := <-pc.Errors():
log.Panic(err)
该程序只打印一次消息,之后它将停止。现在它会将其打印到日志中:
kafka: error while consuming test1/0: kafka: broker not connected
这是代码:
这是consumer.go:
docker - 部署到 Docker 时,在 Golang 中实现的 Apache Kafka 消费者出现恐慌
这是我尝试实现一个简单的微服务,该微服务应该从 kafka 服务器读取消息并通过 HTTP 发送。当我从终端运行它时它工作正常,但是当部署到 docker 时它会恐慌
kafka.go 第 32 和 36 行是 go func(pc sarama.PartitionConsumer)
函数所在的位置。我对编程比较陌生,所以任何帮助将不胜感激。谢谢!
main.go:
服务.go:
卡夫卡.go:
go - Sarama 无法与 Kafka 服务器通信
所以我正在尝试配置一个 Sarama(kafka 的本地 go 客户端)生产者客户端。我已经相应地配置了我的 TLS,确保客户端证书是使用正确的密码生成的。我初始化客户端的 Go 代码如下所示:
当我初始化代码时,我收到一条错误消息:
time="2018-01-19T15:31:38Z" level=error msg="Error trying to setup kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
我已经验证了 Kafka 主机是可访问的并且可以连接到。见下文。
我通过检查 go 代码的输出到openssl rsa -in client_ingestion_client.key -out decrypted.key
命令生成的输出来验证密钥是否被正确解密。我还确保使用带有正确标志的 keytool 正确生成了密钥,包括 -keylag RSA 标志,如此处所建议的。
我也跑了openssl s_client -connect $KAFKA_HOST:$KAFKA_PORT
,得到了以下回应
验证错误很好,因为我使用的是自签名证书,但我不知道接下来的错误是什么。也许这就是我的问题的原因?
此外,我得到以下信息:
由于在 openssl 连接中引用了此密码:
ECDHE-RSA-AES128-GCM-SHA256
我尝试将它添加tls.TLS_RSA_WITH_AES_128_GCM_SHA256
到我的 go 代码中,这看起来很接近,但我收到了同样的错误消息,说它已经用完了可用的经纪人来与之交谈。
go - 从 Sarama 的错误通道中读取的正确方法是什么?
当我生成消息时,我正在使用用 Go 编写的 Sarama 库从错误通道中读取。整个代码如下所示,包含在一个函数中:
正如我对 goroutines 的理解一样,我的 goroutines 会在通道上不断迭代,Errors()
直到它收到一个。一旦我的函数执行完毕,有没有办法让它停止监听错误?
go - 了解 GoLang Profiling 输出
一些上下文:
我正在尝试调试 GoGRPC 服务器,特定的 API 调用似乎需要很多时间。这个调用对 Kafka 进行了多次读取(比如说 10-20 次),所以我预计它需要一些时间,只是没有那么多。
1 个 API 调用大约需要 1-3 秒才能完成,但如果我在脚本中进行 40 个 api 调用,则完成所有这些调用几乎需要 30 秒。但它并没有像我预期的那样“同时”完成它们,第一个需要 5 秒,而后面的则每秒吐出 1 个左右。
它需要 29 秒并一次响应所有 40 个请求。当请求时间过长时,这会导致 API 调用者超时。
我正在尝试分析 CPU 以查看我在哪里花费时间。但我对此并不陌生,go profiler 的输出并没有多大意义。
我用 生成了图表go tool pprof
,但在解释输出时遇到了一些问题。
CPU 调用图
- 有一个框描述时间、类型、buildID 等。
Duration
在这个框中,是描述 CPU 运行的总时间吗?不包括等待时间 - 有两种类型的边缘,实线和虚线。有什么区别?边缘标记的时间是什么意思?
- 箭头的方向是否意味着呼叫方向?例如,函数 A 调用 B,在图上它将是 A -> B?
- 每个顶点在底部都有一个时间,例如 0.49s(45.79%) 中的 0.01s(0.93%),这个时间是什么意思?
块配置文件
- 与上面的#4 相同,边和顶点都有时间。这是什么意思?
编辑:
我的服务器所做的是从 kafka 流中检索一些数据。我已经确定了慢的原因,并且我编写了一个只有 kafka 调用函数的脚本。这是脚本和块分析图。
每次消费到 kafka 大约需要 50-100 毫秒,但由于大部分时间都花在做 IO 上,我预计 API 的吞吐量实际上会很高,事实并非如此。如果我打 100 个电话,大约需要 3 秒,如果我打 400 个电话,大约需要 10 秒。试图了解如何加快 API 的吞吐量
apache-kafka - 为什么我的 kafka cosumer group 不起作用?
我正在使用 sarama-cluster (由 Golang kafka 消费者客户端编写)
在 broker 中,我的主题的分区偏移量是 11000,我的消费者组的分区偏移量是 10100。然后我运行我的集群消费者,但没有任何消费。(消费时间为1~2天后)
但是当我在主题的分区中产生消息时,它会消耗!(在每个分区中)
一条消息是 901。为什么,我的消费者集群消费似乎在产生消息时被激活?
我的消费者设置是 auto.offset.reset = lastest
go - kafka server:Offset的topic尚未创建
我正在使用 Kafka 服务器 0.9 + zookeper。我是卡夫卡的新手。它在 virtualbox 中运行,我可以使用公共 IP 连接到它,所以它可以工作......或多或少:可以获取主题和消息。所以现在我有两个问题:
我在 __consumer_offsets (empty) 找不到任何东西,这就是为什么我遇到 go simple consumer script throwing 错误的问题:
kafka server: Offset's topic has not yet been created
; 当我通过命令检查 zookeeper-shell.sh 控制台时,ls /brokers/topics/__consumer_offsets
它会出错:Node does not exist: /brokers/topics/__consumer_offsets
(也许它们是连接的)我在自动创建主题时收到了轻微警告:
WARN NetworkClient - Error while fetching metadata with correlation id 4157 : {topicName=LEADER_NOT_AVAILABLE}
它高度赞赏任何帮助。谢谢!
go - 当我调用 SendMessage(msg) 时,我总是得到 0 个分区。我通过命令行指定 12
您对 Sarama 和 Kafka 使用什么配置值?
卡夫卡版本:kafka_2.12-1.1.0.tgz Go 版本:1.9.1
问题描述
我正在尝试向我的 Kafka 代理发送消息流,我有 3 个指定的节点,我希望消息位于 12 个分区中,但是当我调用 SendMessage(msg) 时,它返回 0 个分区。这是为什么?为什么我不能在分区中发送消息?
go - Kafka 消息未传递给消费者
我在这里面临一个非常奇怪的问题。我在 go 中使用 sarama-cluster 库启动了一个 Kafka 消费者,以使用来自 kafka 主题的一些消息。但是正在启动的消费者没有收到消息。
然而,一件非常奇怪的事情正在发生。如果我启动另一个与之并行的消费者,则消息会突然传递给两个消费者。
我想不出一个合乎逻辑的解释。任何指针将不胜感激。
注意:这个问题是在 Kafka 和 Zookeeper 服务器非正常启动之后开始的。
下面是消费者的 go 代码,用于消费不工作的消息:
以下是消费者接收消息的 go 代码(此代码与之前代码之间的唯一区别是另一个消费者为同一主题并行运行)。
提前致谢。