0

我通过非常不优雅地关闭 docker 运行进程或让 docker 容器内存不足来测试 ksqldb 服务器上的恰好一次语义。在这两种情况下,我都会收到重复的内容,这绝对不是保证的行为。我觉得我可能在这里错过了明显的...

docker 容器有KSQL_KSQL_STREAMS_PROCESSING_GUARANTEE=exactly_once参数集。据我了解,这将为enable.idempotence和消费者isolation.level属性设置基础生产者设置。

由于以下查询,仍然会出现重复项:here

create or replace table TEST with (kafka_topic =  'TEST', value_format='avro',partitions=10, replicas=1) 
AS
SELECT 
    CUSTOMERS_ID,
    earliest_by_offset(LDTS) AS LDTS, 
    COLLECT_SET(NAMES) AS NAMES,
    earliest_by_offset(CUSTOMER_PK) AS CUSTOMER_PK
from TEST_1
group by CUSTOMERS_PK
emit changes;

还有这里

create or replace stream TEST_STREAM (CUSTOMERS_ID VARCHAR KEY, LDTS BIGINT, NAMES ARRAY<VARCHAR>, CUSTOMER_PK VARCHAR)
WITH 
(KAFKA_TOPIC='TEST', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');

create or replace stream TEST_FINAL (KAFKA_KEY VARCHAR KEY, CUSTOMERS_ID VARCHAR, LDTS BIGINT,NAME VARCHAR, CUSTOMER_PK VARCHAR) WITH
(KAFKA_TOPIC='TEST_FINAL', VALUE_FORMAT='AVRO', partitions=10, replicas=1);

INSERT INTO 
    TEST_FINAL 
select
    CUSTOMERS_ID as KAFKA_KEY,
    AS_VALUE(CUSTOMERS_ID) as CUSTOMERS_ID,
    LDTS,
    NAMES[1] as NAME,
    CUSTOMER_PK
from TEST_STREAM
where
rowtime= LDTS and ARRAY_LENGTH(NAMES)=1;

你可以忽略sql的逻辑。这些只是使问题更有意义的例子。关键是在容器崩溃期间偏移量显然会丢失。

我还可以做些什么 ?我缺少任何属性吗?
我正在使用来自 confluent community v6.2.1 和 ksqldb v0.21 的 kafka 代理

4

1 回答 1

0

我想我毕竟只是回答我自己的问题。似乎 consumer.isolation.level 仍然需要在 ksql docker 环境变量中设置为“read_committed”。尽管一切都表明 processing_guarantee 会为您执行此操作,但我的情况并非如此。添加后,我仍然在主题中看到未提交的消息,但不再在 ksql 流和表中看到。也许这对其他人有帮助

于 2021-10-28T12:25:27.813 回答