如何最初从查询中获取所有数据,然后仅使用 kafka 连接器增量更改。
也许这可以帮助你。例如,我有一张桌子:
╔════╦═════════════╦═══════════╗
║ Id ║ Name ║ Surname ║
╠════╬═════════════╬═══════════╣
║ 1 ║ Martin ║ Scorsese ║
║ 2 ║ Steven ║ Spielberg ║
║ 3 ║ Christopher ║ Nolan ║
╚════╩═════════════╩═══════════╝
在这种情况下,我将创建一个视图:
CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;
在 kafka jdbc 连接器的属性文件中,您可以使用:
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS
所以kafka jdbc连接器会采取以下步骤:
- 首先是 EXID = 0 的所有数据;
- 它将在 connector.offsets 文件中存储偏移值 = 0;
- 新行将被插入到 DIRECTORS 表中。
- Kafka JDBC 连接器将执行:
Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS
并且会注意到 EXID 已增加。
- 数据将在 Kafka Streams 中更新。