我目前正在使用 confluent 3.0.1 平台。我正在尝试在两个不同的工作人员上创建 2 个连接器,但尝试创建一个新的连接器正在为它创建一个新组。
Two connectors were created using below details:
1) POST http://devmetric.com:8083/connectors
{
"name": "connector1",
"config": {
"connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
"tasks.max": "1",
"topics": "dev.ps_primary_delivery",
"elasticsearch.cluster.name": "ad_metrics_store",
"elasticsearch.hosts": "devkafka1.com:9300",
"elasticsearch.bulk.size": "100",
"tenants": "tenant1"
}
}
2) POST http://devkafka01.com:8083/connectors
{
"name": "connector2",
"config": {
"connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector",
"tasks.max": "1",
"topics": "dev.ps_primary_delivery",
"elasticsearch.cluster.name": "ad_metrics_store",
"elasticsearch.hosts": "devkafka.com:9300",
"elasticsearch.bulk.size": "100",
"tenants": "tenant1"
}
}
但它们都是在不同的组 ID 下创建的。在此之后,我查询了现有的组。
$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091 --new-consumer --list
Result was:
connect-connector2
connect-connector1
这些组是由 Kafka connect 自动创建的,不是我提供的。我在worker.properties 中给出了不同的group.id。但我希望两个连接器都在同一个组下,以便它们并行工作以共享消息。截至目前,我有 100 万个关于“dev.ps_primary_delivery”主题的数据,我希望两个连接器各获得 50 万个数据。
请让我知道如何做到这一点。