1

我正在尝试在 Kafka Connect 中实现静态成员资格。我们的 Kafka Connect 集群使用 Strimzi Kafka 算子部署在 K8S 上。

我尝试为工作人员(在 KafkaConnect yaml 中)放置以下配置:

connector.client.config.override.policy: All
consumer.group.instance.id: somethingsomething

在 HttpSinkConnector 类中,我有这个:

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    for (int i = 0; i < maxTasks; i++) {
        Map<String, String> configCopy = new HashMap<>(this.configProps);
        configCopy.put("consumer.override.group.instance.id", Thread.currentThread().getName());

        configs.add(configCopy);
    }
    return configs;
}

这给了 org.apache.kafka.common.errors.FencedInstanceIdException 一些日志 - ...07:07:32,631 错误 [Consumer instanceId=somethingsomething... 因为所有任务都有一些东西作为他们的 group.instance.id 虽然它应该有获取 Thread.currentThread().getName()。

我还尝试了以下方法(没有工作人员配置):

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    for (int i = 0; i < maxTasks; i++) {
        Map<String, String> configCopy = new HashMap<>(this.configProps);
        configCopy.put("consumer.group.instance.id", "somethingsomething");

        configs.add(configCopy);
    }
    return configs;
}

这什么也没做(没有错误,日志中没有 instanceId),这意味着我把这个配置值放在了错误的地方。

那么如何在 Kafka Connect 上实现静态成员资格?

4

1 回答 1

0

使用 consumer.override.group.instance.id而不是 consumer.group.instance.id

于 2021-06-10T10:42:15.070 回答