1

Kafka Connect 源和接收器连接器提供了几乎理想的功能集,无需编写任何代码即可配置数据管道。就我而言,我想用它来集成来自公共 Internet 上的多个数据库服务器(生产者)的数据。

然而,一些生产者无法直接访问 Kafka 代理,因为他们的网络/防火墙配置只允许到特定主机(端口 443)的流量。不幸的是,我无法真正更改这些设置。

我的想法是使用 Confluent REST 代理,但我了解到 Kafka Connect 使用 KafkaProducer API,因此它需要直接访问代理。

我发现了几个可能的解决方法,但没有一个是完美的:

  1. SSH 隧道 - 如:通过 SSH 隧道从 Kafka 集群消费
  2. 使用 REST 代理,但用自定义生产者替换 Kafka Connect,在我们如何在防火墙/代理后面配置 kafka 生产者?
  3. 使用 SSHL 解复用器将流量路由到代理(但只有一个代理)

有没有人遇到过类似的挑战?你是怎么解决的?

4

2 回答 2

1

正如@OneCricketeer 推荐的那样,我尝试了一个带有 REST 代理方法的 HTTP Sink 连接器。我设法配置了 Confluent HTTP Sink 连接器以及替代连接器 (github.com/llofberg/kafka-connect-rest) 以使用 Confluent REST 代理。

我正在添加连接器配置,以防任何尝试这种方法的人节省一些时间。

Confluent HTTP Sink 连接器

    {
    "name": "connector-sink-rest",
    "config": {
        "topics": "test",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "headers": "Content-Type:application/vnd.kafka.json.v2+json",
        "http.api.url": "http://rest:8082/topics/test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable": "false",
        "batch.prefix": "{\"records\":[",
        "batch.suffix": "]}",
        "batch.max.size": "1",
        "regex.patterns":"^~$",
        "regex.replacements":"{\"value\":~}",
        "regex.separator":"~",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1"
    }
}

Kafka Connect REST 连接器

{
    "name": "connector-sink-rest-v2",
    "config": {
        "connector.class": "com.tm.kafka.connect.rest.RestSinkConnector",
        "tasks.max": "1",
        "topics": "test",
        "rest.sink.url": "http://rest:8082/topics/test",
        "rest.sink.method": "POST",
        "rest.sink.headers": "Content-Type:application/vnd.kafka.json.v2+json",        
        "transforms": "velocityEval",        
        "transforms.velocityEval.type": "org.apache.kafka.connect.transforms.VelocityEval$Value",
        "transforms.velocityEval.template": "{\"records\":[{\"value\":$value}]}",
        "transforms.velocityEval.context": "{}"        
    }
}
于 2021-07-14T15:19:37.970 回答
1

接收器连接器(写入外部系统的连接器)不使用生产者 API。

话虽如此,您可以使用一些向 REST 代理端点发出 POST 请求的 HTTP 接收器连接器。这并不理想,但可以解决问题。注意:这意味着您有两个集群- 一个用于通过 Connect 发出 HTTP 请求,另一个位于代理后面。


总的来说,我看不出这个问题对于 Connect 有什么独特之处,因为通过唯一开放的 HTTPS 端口将数据写入 Kafka 的任何其他尝试都会遇到类似的问题。

于 2021-06-25T22:03:46.207 回答