1

我们正在使用 KSQLDB 执行 POC,但有一些疑问:-

我有一个名为 Kafka 的主题USERPROFILE,它有大约 1 亿条唯一记录和 10 天的保留策略。此 Kafka 主题继续从其底层 RDBMS 表实时接收 INSERT/UPDATE 类型的事件。

以下是此 kafka 主题中收到的记录的简单结构:-

{"userid":1001,"firstname":"Hemant","lastname":"Garg","countrycode":"IND","rating":3.7} 

1.)我们已经在上述主题上打开了一个 Kafka 流:-

create STREAM userprofile_stream (userid INT, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 'USERPROFILE')>;

2.)因为,给定的 userId 可以更新,我们只想要唯一的记录(对于每个 userId),我们还在上述主题上打开了另一个 Kafka 表:-

ksql> create TABLE userprofile_table(userid VARCHAR PRIMARY KEY, firstname VARCHAR, lastname VARCHAR, countrycode VARCHAR, rating DOUBLE) WITH (KAFKA_TOPIC = 'USERPROFILE', VALUE_FORMAT = 'DELIMITED');

问题是: -

  • 打开 KTable 是否需要额外的磁盘空间?例如,Kafka 主题有 1 亿条记录,相同的记录是否也会出现在 KTable 中,或者它只是底层 kafka 主题的一些虚拟视图?

  • 对于我们打开的流,同样的问题。打开 KStream 是否需要磁盘(经纪人服务器的)额外空间?例如,Kafka 主题有 1 亿条记录,相同的记录是否也会出现在 KStream 中,或者它只是底层 kafka 主题的一些虚拟视图?

  • 比如说,我们在 5 月 1 日收到了 id 为 1001 的记录,然后在 5 月 11 日,该记录在 Kafka 主题上将不再可用,但是该记录是否仍会出现在 kstream / Ktable 上?KStream / KTable 是否有一些保留政策,就像我们对主题一样?

答案将不胜感激。

-- 最好的阿迪亚

4

1 回答 1

0

ksqlDB 服务器由Kafka Streams提供支持。因此,当您创建流或表时,服务器将分别创建 KStream 或 KTable。

最重要的是,KStream 和 KTables 由 Kafka 中的主题支持。因此,在 ksqlDB 服务器上创建流和表将在您的 Kafka 集群上创建实际的主题。

话虽如此,来自 ksqlDB 的流和表是根据需要具体化并进行了相当优化的,Confluent 的这两篇文章通过良好的视觉帮助提供了更多关于内部行为的见解:

您甚至可以自己查看创建的数据。例如,我创建了:

  • MESSAGES_STREAM来自原始主题的流
  • 来自MATERIALIZED_MESSAGES_STTREAM上面的流
  • MESSAGES第一个流中的表

以下是创建命令供参考:

ksql> CREATE STREAM messages_stream (user_id BIGINT KEY, message VARCHAR) 
  WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON');

ksql> CREATE STREAM materialized_messages_stream AS 
  SELECT user_id, UCASE(message) 
  FROM messages_stream 
EMIT CHANGES;

ksql> CREATE TABLE messages AS
  SELECT user_id, count(*) as msg_count
  FROM messages_stream
  GROUP BY user_id
EMIT CHANGES;

通过查看 ksqlDB 中的详细信息,我们可以看到第一个流使用原始主题作为源:

ksql> describe extended MESSAGES_STREAM;

Name                 : MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : hello_topic_json (partitions: 1, replication: 1)
-- […]

ksql> describe extended MATERIALIZED_MESSAGES_STREAM;

Name                 : MATERIALIZED_MESSAGES_STREAM
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : MATERIALIZED_MESSAGES_STREAM (partitions: 1, replication: 1)
-- […]

ksql> describe extended MESSAGES;

Name                 : MESSAGES
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : MESSAGES (partitions: 1, replication: 1)
-- […]

查看集群上声明的主题,我们可以看到第二个流、表及其在changelog后台创建的主题:

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --list
MATERIALIZED_MESSAGES_STREAM
MESSAGES
__consumer_offsets
__transaction_state
_confluent-ksql-ksql_docker_command_topic
_confluent-ksql-ksql_dockerquery_CTAS_MESSAGES_1-Aggregate-Aggregate-Materialize-changelog
_schemas
hello_topic_json

您还可以看到流和表之间的保留策略不同。前者将删除旧记录,而后者将压缩数据:

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MATERIALIZED_MESSAGES_STREAM --describe
Topic: MATERIALIZED_MESSAGES_STREAM PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=delete

$ ./kafka-topics.sh --bootstrap-server localhost:29092 --topic MESSAGES --describe
Topic: MESSAGES PartitionCount: 1   ReplicationFactor: 1    Configs: cleanup.policy=compact

TL; DR,回到你的问题:

  1. 是的,打开 KTable 需要空间,但这很可能不会是已用字节的 1 对 1 映射。
  2. 在您的情况下,流很可能会将主题用作参考,并且不会占用更多空间,因为您没有发生任何数据转换。
  3. 表上的保留策略是压缩,因此您的条目在表上仍然可用。但是,在您的信息流中,数据将与参考主题中的数据一样可用。
于 2021-05-28T09:42:05.437 回答