我必须将记录从 Aurora/Mysql 发送到 MSK,然后从那里发送到 Elastic 搜索服务
Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->弹性搜索
Aurora 表结构中的记录是这样的,
我认为记录会以这种格式进入 AWS MSK。
"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""
所以为了通过弹性搜索消费,我需要使用正确的模式,所以我必须使用模式注册表。
我的问题
问题 1
对于上述类型的消息模式注册表,我应该如何使用模式注册表?我是否必须为此创建 JSON 结构,如果是,我将其保留在哪里。这里需要更多帮助才能理解这一点?
我已编辑
vim /usr/local/confluent/etc/schema-registry/schema-registry.properties
提到了 zookeper,但我没有什么是kafkastore.topic=_schema
如何将其链接到自定义架构。
即使我开始并得到这个错误
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.
这是我所期待的,因为我没有对 schema 做任何事情。
我确实安装了 jdbc 连接器,当我启动时出现以下错误
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
问题 2 我可以在一个 ec2 上创建两个连接器(jdbc 和弹性 serach 一个)。如果是,我必须在单独的 cli 中同时启动吗?
问题 3 当我打开 vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties 我只看到如下属性值
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
mode=incrementing
incrementing.column.name=id
topic.prefix=trf-aurora-fspaudit-
在上面的属性文件中我可以提到架构名称和表名称?
根据答案,我正在更新我的 Kafka 连接 JDBC 配置
---------------启动 JDBC 连接弹性搜索 -----------------
wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
tar -xzf mysql-connector-java-5.1.48.tar.gz
sudo mv mysql-connector-java-5.1.48 mv /usr/local/confluent/share/java/kafka-connect-jdbc
接着
vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
然后我修改了下面的属性
connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf
mode=incrementing
connection.user=admin
connection.password=Welcome123
table.whitelist=PANStatementInstanceLog
schema.pattern=dbo
最后我修改
vim /usr/local/confluent/etc/kafka/connect-standalone.properties
在这里我修改了以下属性
bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-east-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java
当我列出主题时,我没有看到为表名列出的任何主题。
错误消息的堆栈跟踪
[2020-01-03 07:40:57,169] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2020-01-03 07:40:57,169] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'