0

我正在尝试设置 kafka sink 连接器以写入 exasol 数据库。

我已关注https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/这篇文章。

由于我找不到任何类似的 exasol 接收器连接器类,因此我尝试使用 jar https://github.com/exasol/kafka-connect-jdbc-exasol/tree/master/kafka-connect-exasol/jars [复制了这个$confluent_dir/share/java/kafka-connect-jdbc] 中的 jar 并在下面的配置 json 文件中将其中的 Dialect 类作为连接器类名给出。

I have created a json file for configuration as below:

{
        "name": "jdbc_sink_mysql_dev_02",
        "config": {
                "_comment": "The JDBC connector class. Don't change this if you want to use the JDBC Source.",
                "connector.class": "com.exasol.connect.jdbc.dailect.ExasolDatabaseDialect",

                "_comment": "How to serialise the value of keys - here use the Confluent Avro serialiser. Note that the JDBC Source Connector always returns null for the key ",
                "key.converter": "io.confluent.connect.avro.AvroConverter",

                "_comment": "Since we're using Avro serialisation, we need to specify the Confluent schema registry at which the created schema is to be stored. NB Schema Registry and Avro serialiser are both part of Confluent Platform.",
                "key.converter.schema.registry.url": "http://localhost:8081",

                "_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url": "http://localhost:8081",


                "_comment": " --- JDBC-specific configuration below here  --- ",
                "_comment": "JDBC connection URL. This will vary by RDBMS. Consult your manufacturer's handbook for more information",
                "connection.url": "jdbc:exa:<myhost>:<myport> <myuser>/<mypassword>",

                "_comment": "Which table(s) to include",
                "table.whitelist": "<my_table_name>",

                "_comment": "Pull all rows based on an timestamp column. You can also do bulk or incrementing column-based extracts. For more information, see http://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode",
                "mode": "timestamp",

                "_comment": "Which column has the timestamp value to use?  ",
                "timestamp.column.name": "update_ts",

                "_comment": "If the column is not defined as NOT NULL, tell the connector to ignore this  ",
                "validate.non.null": "false",

                "_comment": "The Kafka topic will be made up of this prefix, plus the table name  ",
                "topic.prefix": "mysql-"
        }
}

我正在尝试使用以下命令加载此连接器:

./bin/confluent load jdbc_sink_mysql_dev_02  -d <my_configuration_json_file_path>
P.S. My confluent version is 5.1.0

以类似的方式,我创建了一个 mysql-source 连接器,用于从 mysql 读取数据并且它运​​行良好,我的用例要求使用 sink-connector 将该数据写入 exasol 数据库。

虽然我没有收到任何异常,但 kafka 没有阅读任何消息。

配置此类接收器连接器以写入 exasol 数据库的任何指针或帮助。

4

0 回答 0