0

我正在尝试以分布式模式运行 Kafka 工作人员。与独立模式不同,我们不能在分布式模式下启动 worker 时传递连接器属性文件。在分布式模式下,worker 是单独启动的,我们使用 REST API 在这些worker 上部署和管理连接器

参考链接 - https://docs.confluent.io/current/connect/managing/configuring.html#connect-managing-distributed-mode

我尝试通过在 curl 命令中传递以下值来构建连接器并执行它

curl -X POST -H "Content-Type: application/json" --data '{"name":"sailpointdb","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","connection.password " : " abc","connection.url " : "jdbc:mysql://localhost:3306/db","connection.user " : "abc" ,"query" : " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS 
TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A","mode" : " timestamp","timestamp.column.name" : "TASKFAILEDON","topic.prefix" : "dbevents","validate.non.null" : "false" }}' http://localhost:8089/connectors/

我收到以下错误 - curl: (3) URL 使用错误/非法格式或缺少 URL

请让我知道上面的 curl 语句有什么问题,我在这里遗漏了什么吗

4

1 回答 1

1
  1. 您的 JSON 中有一个额外的右花括号,这无济于事
  2. 如果您愿意POST/connectors您需要nameconfig根级别元素。但是,我建议使用,PUT /config因为如果需要,您可以重新运行它来更新配置

尝试这个:

curl -X PUT -H  "Content-Type:application/json" \
      http://localhost:8089/connectors/source-jdbc-sailpointdb-00/config \
      -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.password ": " abc",
        "connection.url ": "jdbc:mysql://localhost:3306/db",
        "connection.user ": "abc",
        "query": " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A",
        "mode": " timestamp",
        "timestamp.column.name": "TASKFAILEDON",
        "topic.prefix": "dbevents",
        "validate.non.null": "false"
    }'
于 2019-09-26T07:45:34.857 回答