3

我们通过 CDC 将数据从 DB2(表 1)发送到 Kafka 主题(主题 1)。我们需要在 DB2 数据和 Kafka 主题之间进行协调。我们有两个选择——

a) 将所有 kafka 主题数据下载到 DB2 中(作为 table-1-copy),然后进行左外连接(在 table-1 和 table-1-copy 之间)以查看不匹配的记录,创建增量并推送它回到卡夫卡。 问题:可伸缩性——我们的数据集大约有 10 亿条记录,我不确定 DB2 DBA 是否会让我们运行如此庞大的连接操作(可能很容易持续超过 15-20 分钟)。

b) 将 DB2 再次推回并行 kafka 主题(topic-1-copy),然后执行一些基于 kafka 流的解决方案,以在 kafka topic-1 和 topic-1-copy 之间进行左外连接。我仍然围绕着卡夫卡流和左外连接。我不确定(在 kafka 流中使用窗口系统)我是否能够将 topic-1 的整个内容与 topic-1-copy 进行比较。

更糟糕的是,kafka 中的 topic-1 是一个 compact topic,所以当我们将数据从 DB2 推送回 Kafka topic-1-copy 时,我们无法确定性地启动 kafka topic-compaction 循环以确保两个 topic- 1 和 topic-1-copy 在对它们运行任何类型的比较操作之前完全压缩。

c) 有没有其他我们可以考虑的框架选项?

理想的解决方案必须针对任何大小的数据进行扩展。

4

1 回答 1

0

我看不出为什么您不能在 Kafka Streams 或 KSQL 中执行此操作。两者都支持表-表连接。这是假设支持数据的格式。

密钥压缩不会影响结果,因为 Streams 和 KSQL 都将构建连接两个表的正确最终状态。如果压缩已经运行,需要处理的数据量可能会更少,但结果将是相同的。

例如,在 ksqlDB 中,您可以将两个主题作为表导入并执行连接,然后按topic-1表进行过滤null以查找缺失行的列表。

-- example using 0.9 ksqlDB, assuming a INT primary key:

-- create table from main topic:
CREATE TABLE_1 
   (ROWKEY INT PRIMARY KEY, <other column defs>) 
   WITH (kafka_topic='topic-1', value_format='?');

-- create table from second topic:
CREATE TABLE_2 
   (ROWKEY INT PRIMARY KEY, <other column defs>) 
   WITH (kafka_topic='topic-1-copy', value_format='?');

-- create a table containing only the missing keys:
CREATE MISSING AS
   SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
   WHERE T1.ROWKEY = null;

这种方法的好处是MISSING丢失行的表会自动更新:当您从源 DB2 实例中提取丢失的行并将它们生成到topic-1时,“MISSING”表中的行将被删除,即您会看到正在为该MISSING主题制作墓碑。

您甚至可以扩展此方法以查找topic-1不再存在于源数据库中的行:

-- using the same DDL statements for TABLE_1 and TABLE_2 from above

-- perform the join:
CREATE JOINED AS
   SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;

-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
   SELECT * FROM JOINED
   WHERE T1_ROWKEY = null;

-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
   SELECT * FROM JOINED
   WHERE T2_ROWKEY = null;

当然,您需要相应地调整集群的大小。您的 ksqlDB 集群越大,处理数据的速度就越快。它还需要磁盘容量来实现表。

您可以通过主题上的分区数设置最大并行化量。如果您只有 1 个分区,则数据将按顺序处理。如果运行 100 个分区,那么您可以使用 100 个 CPU 内核处理数据,前提是您运行了足够多的 ksqlDB 实例。(默认情况下,每个 ksqlDB 节点将为每个查询创建 4 个流处理线程,(尽管如果服务器有更多内核,您可以增加这个!))。

于 2020-06-01T14:46:02.523 回答