0

我正在尝试创建一个kafka生产者,它将消息发送到 kafka 经纪人(而不是动物园管理员)。

我知道更好的做法是使用 zk,但目前我想直接向代理发送消息。

为此,我按照文档中的描述设置属性“broker.list” 。问题是,为了让它工作,它至少需要 3 个经纪人(否则我得到一个例外)。

在kafka的源代码中我可以看到:

if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value")

这很奇怪,因为在我的数据中心我只拥有 2 个 kafka 节点(和 3 个 zk),在这种情况下我该怎么办?有没有办法解决这个问题?

4

2 回答 2

2

brokerInfo是通过拆分单个经纪人信息而不是经纪人的数量来获得的。如果您更仔细地检查源代码,您会看到类似

// check if each individual broker info is valid => (brokerId: brokerHost: brokerPort)

然后他们将这些信息拆分如下

   brokerInfoList.foreach { bInfo =>
       val brokerInfo = bInfo.split(":")
       if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value")
   }

因此,每个代理都希望有一个由分隔符分隔 的主机名端口的id,基本上是关于它只是这样做的代理的数量:

    val brokerInfoList = config.brokerList.split(",")
    if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty")

所以我猜你应该没问题,只需尝试通过一个经纪人,它应该可以工作。让我们知道怎么回事

于 2013-11-29T08:28:36.120 回答
0

写的时候明显

props.put("broker.list", "0:" + <host:port>);

它有效(我在原始字符串中添加了“0:”)。我在快速入门指南的第 9 节中找到了它。

我不确定我是否得到它,也许这个零是分区号(?)也许是别的东西(如果有人可以在这里阐明一些可能会很好)。

于 2013-11-29T08:35:04.797 回答