0

我正在使用 sarama(1.27) ClusterAdmin 来管理 kafka1.1.0 中的主题。我管理 kafka 主题的应用程序作为 REST 服务运行。我的应用程序运行良好一段时间,我可以获取/创建/删除主题。

但是在没有任何活动的情况下经过一段时间后,一个新的主题请求会出错 - write tcp xxxxx:37888->xxxxx:9092: write: broken pipe。

我遇到了这个How to fix broker may not available after broken pipe

由于我的应用程序作为服务运行,如何防止管道损坏问题?我仅在应用程序退出时关闭 ClusterAdmin。相同的 ClusterAdmin 连接用于服务所有请求。如果出于任何原因它为 nil,我会为每个请求重新初始化 clusterAdmin(通常在第一次初始化后它不是 nil,因此重用相同的连接)。

我应该在处理每个请求后关闭 clusteradmin 并为每个主题请求打开一个 NewClusterAdmin(),还是需要使用 keepalive 选项?

这是我现有的代码:

if admin == nil{
        admin, err := NewClusterAdmin([]string{"localhost:9092"}, s.config)
        ..
}
topicMetadata, err := admin.DescribeTopics([]string{topicName})
4

1 回答 1

0

我也遇到了这个错误。我解决这个问题的方法是再试几次,例如 2 到 10 次。

于 2021-08-02T07:11:16.343 回答