我正在尝试复制 Flink 的upsert-kafka 连接器示例。
使用以下输入:
event_id,user_id,page_id,user_region,viewtime
e0,1,11,TR,2022-01-01T13:26:41.298Z
e1,1,22,TR,2022-01-02T13:26:41.298Z
e2,2,11,AU,2022-02-01T13:26:41.298Z
并创建了一个主题,其事件结构如下所示:
key: {"event_id":"e2"},
value: {"event_id": "e2", "user_id": 2, "page_id": 11, "user_region": "AU", "viewtime": "2022-02-01T13:26:41.298Z"}
使用以下 kafka 上游,kafka-upsert sink 逻辑:
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'json',
'value.format' = 'json'
);
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = '...',
'format' = 'json'
);
-- calculate the pv, uv and insert into the upsert-kafka sink
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
我希望只为 {"user_region":"TR"} 获得一个具有更新 pv: 2 的键,但是创建的主题似乎没有被日志压缩,因此观察到同一个 user_region 的两个事件:
k: {"user_region":"AU"}, v: {"user_region":"AU","pv":1,"uv":1}
k: {"user_region":"TR"}, v: {"user_region":"TR","pv":2,"uv":1}
k: {"user_region":"TR"}, v: {"user_region":"TR","pv":1,"uv":1}
upsert-kafka连接器不应该为此用例创建日志压缩主题,还是开发人员有责任更新主题配置?
另一种选择可能是我误解了某些东西或犯了错误。期待听到你的想法。谢谢。