1

在此处输入图像描述

我正在尝试使用 kafka 将数据从 oracleDB 传输到 mongoDB。所以我像上图一样配置了kafka集群。我知道调整分区和 tasks.max 允许并行处理。但是,当我运行连接器时,它始终作为单个任务运行,不能并行处理。我需要做任何额外的设置吗?

这是我配置的。

  1. 主题创建

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092,127.0.0.2:9092,127.0.0.1:9093 --partitions 3 --topic topicA

  1. 连接器配置

    {
    "name": "rawsumc-source",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:oracle:thin:@127.0.0.1:1521/orcl",
        "connection.user": "test",
        "connection.password": "test",
        "topic.prefix": "topicA",
        "mode": "bulk",
        "poll.interval.ms": "360000000",
        "numeric.mapping": "best_fit",
        "tasks.max": "10",
        "connection.type": "lz4",
        "query": "select CAST(NO_TT AS NUMBER(10,0)) AS NO_TT,CAST(NO_SEQ AS NUMBER(10,0)) AS NO_SEQ,DNT_CLCT from table_a",
        "name": "rawsumc-source"
    },
    "tasks": [
        {
            "connector": "rawsumc-source",
            "task": 0
        }
    ],
    "type": "source"}
    
4

2 回答 2

0

根据文档

tasks.max-应为此连接器创建的最大任务数。如果连接器无法达到这种并行度,它可能会创建更少的任务。

使用 JdbcSourceConnector 的自定义查询将您限制为单个任务。

于 2020-05-22T11:21:52.933 回答
0

我也对这个连接器任务感到困惑。一开始以为kafka connect解决了多客户端数据重复的问题。但实际上kafka connect选择了其他方式来避免这个问题。我跟踪了源代码并找到了它的实际实现。

(我现在不能发图,大家可以点击链接看图。)首先是taskConfigs接口: taskConfigs接口

    /**
 * Returns a set of configurations for Tasks based on the current configuration,
 * producing at most count configurations.
 *
 * @param maxTasks maximum number of configurations to generate
 * @return configurations for Tasks
 */
public abstract List<Map<String, String>> taskConfigs(int maxTasks);

那么它的'调用这个接口: 调用这个接口

org.apache.kafka.connect.runtime.distributed.DistributedHerder:
  final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
        boolean changed = false;
        int currentNumTasks = configState.taskCount(connName);
        if (taskProps.size() != currentNumTasks) {
            log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size());
            changed = true;
        } 

kafka connect如何使用task props: kafka connect如何使用task props

if (changed) {
            List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
            if (isLeader()) {
                configBackingStore.putTaskConfigs(connName, rawTaskProps);
                cb.onCompletion(null, null);
            }

这意味着 kafka connect 使用任务配置来创建内部线程。因此,有多少任务运行取决于 taskConfigs 方法上的连接器实现。

让我们看看mqtt源连接器上的实现: mqtt源连接器上的实现

   public List<Map<String, String>> taskConfigs(int maxTasks) {
 List<Map<String, String>> result = new ArrayList<>();
 int taskId = 0;
 for (List<String> mqttTopics : (Iterable<List<String>>)ConnectorUtils.groupPartitions(this.config.mqttTopics, maxTasks)) {
   if (mqttTopics.isEmpty())
     continue; 
   Map<String, String> settings = new LinkedHashMap<>(this.settings);
   settings.put("mqtt.topics", Joiner.on(',').join(mqttTopics));
   settings.put("task.id", Integer.toString(taskId++));
   result.add(settings);
 } 
 return result;

此任务按 mqtt 主题分组。实际上我只是声明了一个主题过滤器,所以我总是得到一个 mqtt 源任务。将运行多少任务取决于连接器上的 taskConfigs 方法实现,输入是连接器配置中的最大任务。 您可以阅读连接器上此方法的源代码实现。

于 2021-08-19T03:33:00.453 回答