我在 PostgreSQL 中有一个具有以下模式的表:
Table "public.kc_ds"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('kc_ds_id_seq'::regclass) | plain | |
num | integer | | not null | | plain | |
text | character varying(50) | | not null | | extended | |
Indexes:
"kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
"dbz_publication"
当我为这个使用io.confluent.connect.avro.AvroConverter
Schema Registry 的表运行 Debezium 源连接器时,它会创建一个看起来像这样的 Schema Registry 模式(这里省略了一些字段):
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"num",
"type":"int"
},
{
"name":"text",
"type":"string"
}
],
"connect.name":"xxx.public.kc_ds.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
]
我的 Kafka 主题中由 Debezium 生成的消息如下所示(省略了某些字段):
{
"before": null,
"after": {
"xxx.public.kc_ds.Value": {
"id": 2,
"num": 2,
"text": "text version 1"
}
}
当我插入或更新时,"before"
总是null
,并且"after"
包含我的数据;当我删除时,反之亦然:"after"
为空并"before"
包含数据(尽管所有字段都设置为默认值)。
问题 #1:为什么 Kafka Connect 使用"before"
和"after"
字段创建模式?为什么这些领域的行为如此奇怪?
问题 #2:是否有一种内置方法可以让 Kafka Connect在仍然使用 Schema Registry 的同时向我的主题发送平面消息?请注意,Flatten变换不是我需要的:如果启用,我仍将拥有"before"
and"after"
字段。
问题 #3(实际上并不希望得到任何东西,但也许有人知道):扁平化我的消息的必要性来自于我需要使用HudiDeltaStreamer从我的主题中读取数据,而且这个工具似乎需要扁平化的输入数据。和字段最终在生成的 .parquet 文件中成为单独的类似对象"before"
的列。有谁知道 HudiDeltaStreamer 应该如何与 Kafka Connect 生成的消息集成?"after"