我通过非常不优雅地关闭 docker 运行进程或让 docker 容器内存不足来测试 ksqldb 服务器上的恰好一次语义。在这两种情况下,我都会收到重复的内容,这绝对不是保证的行为。我觉得我可能在这里错过了明显的...
docker 容器有KSQL_KSQL_STREAMS_PROCESSING_GUARANTEE=exactly_once
参数集。据我了解,这将为enable.idempotence
和消费者isolation.level
属性设置基础生产者设置。
由于以下查询,仍然会出现重复项:here
create or replace table TEST with (kafka_topic = 'TEST', value_format='avro',partitions=10, replicas=1)
AS
SELECT
CUSTOMERS_ID,
earliest_by_offset(LDTS) AS LDTS,
COLLECT_SET(NAMES) AS NAMES,
earliest_by_offset(CUSTOMER_PK) AS CUSTOMER_PK
from TEST_1
group by CUSTOMERS_PK
emit changes;
还有这里
create or replace stream TEST_STREAM (CUSTOMERS_ID VARCHAR KEY, LDTS BIGINT, NAMES ARRAY<VARCHAR>, CUSTOMER_PK VARCHAR)
WITH
(KAFKA_TOPIC='TEST', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');
create or replace stream TEST_FINAL (KAFKA_KEY VARCHAR KEY, CUSTOMERS_ID VARCHAR, LDTS BIGINT,NAME VARCHAR, CUSTOMER_PK VARCHAR) WITH
(KAFKA_TOPIC='TEST_FINAL', VALUE_FORMAT='AVRO', partitions=10, replicas=1);
INSERT INTO
TEST_FINAL
select
CUSTOMERS_ID as KAFKA_KEY,
AS_VALUE(CUSTOMERS_ID) as CUSTOMERS_ID,
LDTS,
NAMES[1] as NAME,
CUSTOMER_PK
from TEST_STREAM
where
rowtime= LDTS and ARRAY_LENGTH(NAMES)=1;
你可以忽略sql的逻辑。这些只是使问题更有意义的例子。关键是在容器崩溃期间偏移量显然会丢失。
我还可以做些什么 ?我缺少任何属性吗?
我正在使用来自 confluent community v6.2.1 和 ksqldb v0.21 的 kafka 代理