正如@OneCricketeer 推荐的那样,我尝试了一个带有 REST 代理方法的 HTTP Sink 连接器。我设法配置了 Confluent HTTP Sink 连接器以及替代连接器 (github.com/llofberg/kafka-connect-rest) 以使用 Confluent REST 代理。
我正在添加连接器配置,以防任何尝试这种方法的人节省一些时间。
Confluent HTTP Sink 连接器
{
"name": "connector-sink-rest",
"config": {
"topics": "test",
"tasks.max": "1",
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"headers": "Content-Type:application/vnd.kafka.json.v2+json",
"http.api.url": "http://rest:8082/topics/test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"batch.prefix": "{\"records\":[",
"batch.suffix": "]}",
"batch.max.size": "1",
"regex.patterns":"^~$",
"regex.replacements":"{\"value\":~}",
"regex.separator":"~",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1"
}
}
Kafka Connect REST 连接器
{
"name": "connector-sink-rest-v2",
"config": {
"connector.class": "com.tm.kafka.connect.rest.RestSinkConnector",
"tasks.max": "1",
"topics": "test",
"rest.sink.url": "http://rest:8082/topics/test",
"rest.sink.method": "POST",
"rest.sink.headers": "Content-Type:application/vnd.kafka.json.v2+json",
"transforms": "velocityEval",
"transforms.velocityEval.type": "org.apache.kafka.connect.transforms.VelocityEval$Value",
"transforms.velocityEval.template": "{\"records\":[{\"value\":$value}]}",
"transforms.velocityEval.context": "{}"
}
}