我也对这个连接器任务感到困惑。一开始以为kafka connect解决了多客户端数据重复的问题。但实际上kafka connect选择了其他方式来避免这个问题。我跟踪了源代码并找到了它的实际实现。
(我现在不能发图,大家可以点击链接看图。)首先是taskConfigs接口:
taskConfigs接口
/**
* Returns a set of configurations for Tasks based on the current configuration,
* producing at most count configurations.
*
* @param maxTasks maximum number of configurations to generate
* @return configurations for Tasks
*/
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
那么它的'调用这个接口:
调用这个接口
org.apache.kafka.connect.runtime.distributed.DistributedHerder:
final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
boolean changed = false;
int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) {
log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
changed = true;
}
kafka connect如何使用task props:
kafka connect如何使用task props
if (changed) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
configBackingStore.putTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
}
这意味着 kafka connect 使用任务配置来创建内部线程。因此,有多少任务运行取决于 taskConfigs 方法上的连接器实现。
让我们看看mqtt源连接器上的实现:
mqtt源连接器上的实现
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<>();
int taskId = 0;
for (List<String> mqttTopics : (Iterable<List<String>>)ConnectorUtils.groupPartitions(this.config.mqttTopics, maxTasks)) {
if (mqttTopics.isEmpty())
continue;
Map<String, String> settings = new LinkedHashMap<>(this.settings);
settings.put("mqtt.topics", Joiner.on(',').join(mqttTopics));
settings.put("task.id", Integer.toString(taskId++));
result.add(settings);
}
return result;
此任务按 mqtt 主题分组。实际上我只是声明了一个主题过滤器,所以我总是得到一个 mqtt 源任务。将运行多少任务取决于连接器上的 taskConfigs 方法实现,输入是连接器配置中的最大任务。
您可以阅读连接器上此方法的源代码实现。