我看不出为什么您不能在 Kafka Streams 或 KSQL 中执行此操作。两者都支持表-表连接。这是假设支持数据的格式。
密钥压缩不会影响结果,因为 Streams 和 KSQL 都将构建连接两个表的正确最终状态。如果压缩已经运行,需要处理的数据量可能会更少,但结果将是相同的。
例如,在 ksqlDB 中,您可以将两个主题作为表导入并执行连接,然后按topic-1
表进行过滤null
以查找缺失行的列表。
-- example using 0.9 ksqlDB, assuming a INT primary key:
-- create table from main topic:
CREATE TABLE_1
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1', value_format='?');
-- create table from second topic:
CREATE TABLE_2
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1-copy', value_format='?');
-- create a table containing only the missing keys:
CREATE MISSING AS
SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
WHERE T1.ROWKEY = null;
这种方法的好处是MISSING
丢失行的表会自动更新:当您从源 DB2 实例中提取丢失的行并将它们生成到topic-1
时,“MISSING”表中的行将被删除,即您会看到正在为该MISSING
主题制作墓碑。
您甚至可以扩展此方法以查找topic-1
不再存在于源数据库中的行:
-- using the same DDL statements for TABLE_1 and TABLE_2 from above
-- perform the join:
CREATE JOINED AS
SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;
-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
SELECT * FROM JOINED
WHERE T1_ROWKEY = null;
-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
SELECT * FROM JOINED
WHERE T2_ROWKEY = null;
当然,您需要相应地调整集群的大小。您的 ksqlDB 集群越大,处理数据的速度就越快。它还需要磁盘容量来实现表。
您可以通过主题上的分区数设置最大并行化量。如果您只有 1 个分区,则数据将按顺序处理。如果运行 100 个分区,那么您可以使用 100 个 CPU 内核处理数据,前提是您运行了足够多的 ksqlDB 实例。(默认情况下,每个 ksqlDB 节点将为每个查询创建 4 个流处理线程,(尽管如果服务器有更多内核,您可以增加这个!))。