2

在 KSQL 中是否可以从表中流出新旧值?我们想使用一个表作为值的存储,当一个表发生变化时,会输出一个“反转”值,它是前一个值,以某种方式标记,以及新值,这样我们就可以处理下游系统中的增量?

4

2 回答 2

1

Kafka 表通常用于存储最新值。因此,例如说表中存在键为“123”的流,并且主题上出现了具有相同键“123”但列值不同的新流,这将覆盖(更新插入)表中的现有值。

所以在 Table 上做这件事可能不是一个好主意。

您的用例对我来说并不清楚,但我的建议是您需要在流的源中使用某种机制或使用时间戳来处理增量提要。

于 2018-10-02T03:57:02.013 回答
0

是的,有可能。确实需要一些杂耍。

创建表以保持最后状态

create table v1_mux_connection_ping_ta
as 
select 
  assetid, 
  LATEST_BY_OFFSET(pingable) pingable
from v1_mux_connection_ping_st_parse 
group by assetid;

问题是它也不会发出任何变化。一种解决方案是将表转换为流。

CREATE STREAM v1_mux_connection_ping_ta_s 
  (assetId VARCHAR KEY, pingable VARCHAR) 
WITH (kafka_topic='V1_MUX_CONNECTION_PING_TA', value_format='JSON');

只得出改变的值

create table d_opt_details as 
select 
  s.assetId,
  LATEST_BY_OFFSET(s.pingable) new,
  LATEST_BY_OFFSET(s.pingable, 2)[1] old
from v1_mux_connection_ping_ta_s s
group by
  s.assetId;

create table opt_details as 
select 
  s.assetId, s.new as pingable
from d_opt_details s
where new != old;
于 2021-11-21T14:07:08.827 回答