0

我确实配置了与数据库的连接并通过主题传输所有数据,因为当我运行消费者时它返回数据

如何将此主题转换为表格并将数据保存在 KSQL 中?

非常感谢

4

1 回答 1

4

您不会在 KSQL 中保留数据。KSQL 只是 Kafka 中用于查询和转换数据的引擎。KSQL 查询的来源是 Kafka 主题,KSQL 查询的输出要么是交互式的,要么是返回到另一个 kafka 主题。

如果您的 Kafka 主题中有数据(听起来好像有),那么在 KSQL 中运行LIST TOPICS;

ksql> LIST TOPICS;    

 Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
 _confluent-metrics          | false      | 12         | 1                  | 0         | 0
 asgard.demo.accounts        | false      | 1          | 1                  | 0         | 0

查看您的 Kafka 主题。从那里,选择你的主题,你可以运行PRINT 'my-topic' FROM BEGINNING;

ksql> PRINT 'asgard.demo.accounts' FROM BEGINNING;
Format:AVRO
10/11/18 9:24:45 AM UTC, null, {"account_id": "a42", "first_name": "Robin", "last_name": "Moffatt", "email": "robin@confluent.io", "phone": "+44 123 456 789", "address": "22 Acacia Avenue", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a081", "first_name": "Sidoney", "last_name": "Lafranconi", "email": "slafranconi0@cbc.ca", "phone": "+44 908 687 6649", "address": "40 Kensington Pass", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a135", "first_name": "Mick", "last_name": "Edinburgh", "email": "medinburgh1@eepurl.com", "phone": "+44 301 837 6535", "address": "27 Blackbird Lane", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}

查看它的内容。按 Ctrl-C 取消PRINT语句并返回命令行。

注意语句Format的输出。PRINT这是您的数据的序列化格式。

如果数据在 Avro 中序列化,那么您可以运行:

CREATE STREAM mydata WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');

如果它在 JSON 中,您还需要指定列名和数据类型

CREATE STREAM mydata (col1 INT, col2 VARCHAR) WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='JSON');

现在您已经使用 KSQL '注册'了这个主题,您可以使用以下命令查看它的架构DESCRIBE

ksql> DESCRIBE mydata;

Name                 : MYDATA
 Field         | Type
-------------------------------------------
 ROWTIME       | BIGINT           (system)
 ROWKEY        | VARCHAR(STRING)  (system)
 ACCOUNT_ID    | VARCHAR(STRING)
 FIRST_NAME    | VARCHAR(STRING)
 LAST_NAME     | VARCHAR(STRING)
 EMAIL         | VARCHAR(STRING)
 PHONE         | VARCHAR(STRING)
 ADDRESS       | VARCHAR(STRING)
 COUNTRY       | VARCHAR(STRING)
 CREATE_TS     | VARCHAR(STRING)
 UPDATE_TS     | VARCHAR(STRING)
 MESSAGETOPIC  | VARCHAR(STRING)
 MESSAGESOURCE | VARCHAR(STRING)
-------------------------------------------

然后使用KSQL查询和操作数据:

ksql> SET 'auto.offset.reset'='earliest';

ksql> SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';

Robin Moffatt | robin@confluent.io
Sidoney Lafranconi | slafranconi0@cbc.ca
Mick Edinburgh | medinburgh1@eepurl.com
Merrill Stroobant | mstroobant2@china.com.cn

按 Ctrl-C 取消SELECT查询。

KSQL 可以将其持久化到新的 Kafka 主题:

CREATE STREAM UK_USERS AS SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';

如果您再次列出您的 KSQL 主题,您将看到创建和填充的新主题:

ksql> LIST TOPICS;

 Kafka Topic                 | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
 _confluent-metrics          | false      | 12         | 1                  | 0         | 0
 asgard.demo.accounts        | true       | 1          | 1                  | 2         | 2
 UK_USERS                    | true       | 4          | 1                  | 0         | 0
---------------------------------------------------------------------------------------------------------
ksql>

进入源主题 ( asgard.demo.accounts) 的每个事件都会被 KSQL 读取和过滤,然后UK_USERS根据您执行的 SQL 写入目标主题 ()。

有关详细信息,请参阅KSQL 语法文档教程

免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。

于 2018-10-11T15:14:53.917 回答