我正在使用 sarama(1.27) ClusterAdmin 来管理 kafka1.1.0 中的主题。我管理 kafka 主题的应用程序作为 REST 服务运行。我的应用程序运行良好一段时间,我可以获取/创建/删除主题。
但是在没有任何活动的情况下经过一段时间后,一个新的主题请求会出错 - write tcp xxxxx:37888->xxxxx:9092: write: broken pipe。
我遇到了这个How to fix broker may not available after broken pipe。
由于我的应用程序作为服务运行,如何防止管道损坏问题?我仅在应用程序退出时关闭 ClusterAdmin。相同的 ClusterAdmin 连接用于服务所有请求。如果出于任何原因它为 nil,我会为每个请求重新初始化 clusterAdmin(通常在第一次初始化后它不是 nil,因此重用相同的连接)。
我应该在处理每个请求后关闭 clusteradmin 并为每个主题请求打开一个 NewClusterAdmin(),还是需要使用 keepalive 选项?
这是我现有的代码:
if admin == nil{
admin, err := NewClusterAdmin([]string{"localhost:9092"}, s.config)
..
}
topicMetadata, err := admin.DescribeTopics([]string{topicName})