7

我试图弄清楚如何最初从查询中获取所有数据,然后只使用 kafka 连接器进行增量更改。这样做的原因是我想将所有数据加载到弹性搜索中,然后保持 es 与我的 kafka 流同步。目前,我首先使用带有模式=批量的连接器,然后将其更改为时间戳。这工作正常。

但是,如果我们想要将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本以某种方式清理或删除 kafka 流和 es 索引数据,修改 connect ini 以将模式设置为批量,重新启动一切,给是时候加载所有数据了,然后将脚本再次修改为时间戳模式,然后再次重新启动所有内容(需要这样一个脚本的原因是,偶尔,批量更新会通过我们尚无法控制的 etl 进程来纠正历史数据,并且此过程不会更新时间戳)

有没有人在做类似的事情并找到了更优雅的解决方案?

4

2 回答 2

1

很长一段时间后回到这个。该方法能够解决这个问题,并且永远不必使用批量模式

  1. 停止连接器
  2. 擦除每个连接器 jvm 的偏移文件
  3. (可选)如果你想做一个完整的擦除和加载,你可能还想删除你的主题使用 kafka/connect utils/rest api(不要忘记状态主题)
  4. 重启连接。
于 2018-01-28T16:48:17.237 回答
0

如何最初从查询中获取所有数据,然后仅使用 kafka 连接器增量更改。

也许这可以帮助你。例如,我有一张桌子:

╔════╦═════════════╦═══════════╗
║ Id ║    Name     ║  Surname  ║
╠════╬═════════════╬═══════════╣
║  1 ║ Martin      ║ Scorsese  ║
║  2 ║ Steven      ║ Spielberg ║
║  3 ║ Christopher ║ Nolan     ║
╚════╩═════════════╩═══════════╝

在这种情况下,我将创建一个视图:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;

在 kafka jdbc 连接器的属性文件中,您可以使用:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS

所以kafka jdbc连接器会采取以下步骤:

  1. 首先是 EXID = 0 的所有数据;
  2. 它将在 connector.offsets 文件中存储偏移值 = 0;
  3. 新行将被插入到 DIRECTORS 表中。
  4. Kafka JDBC 连接器将执行:Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS并且会注意到 EXID 已增加。
  5. 数据将在 Kafka Streams 中更新。
于 2017-05-09T22:58:29.610 回答