我确实配置了与数据库的连接并通过主题传输所有数据,因为当我运行消费者时它返回数据
如何将此主题转换为表格并将数据保存在 KSQL 中?
非常感谢
您不会在 KSQL 中保留数据。KSQL 只是 Kafka 中用于查询和转换数据的引擎。KSQL 查询的来源是 Kafka 主题,KSQL 查询的输出要么是交互式的,要么是返回到另一个 kafka 主题。
如果您的 Kafka 主题中有数据(听起来好像有),那么在 KSQL 中运行LIST TOPICS;
:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
asgard.demo.accounts | false | 1 | 1 | 0 | 0
查看您的 Kafka 主题。从那里,选择你的主题,你可以运行PRINT 'my-topic' FROM BEGINNING;
ksql> PRINT 'asgard.demo.accounts' FROM BEGINNING;
Format:AVRO
10/11/18 9:24:45 AM UTC, null, {"account_id": "a42", "first_name": "Robin", "last_name": "Moffatt", "email": "robin@confluent.io", "phone": "+44 123 456 789", "address": "22 Acacia Avenue", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a081", "first_name": "Sidoney", "last_name": "Lafranconi", "email": "slafranconi0@cbc.ca", "phone": "+44 908 687 6649", "address": "40 Kensington Pass", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
10/11/18 9:24:45 AM UTC, null, {"account_id": "a135", "first_name": "Mick", "last_name": "Edinburgh", "email": "medinburgh1@eepurl.com", "phone": "+44 301 837 6535", "address": "27 Blackbird Lane", "country": "United Kingdom", "create_ts": "2018-10-11T09:23:22Z", "update_ts": "2018-10-11T09:23:22Z", "messagetopic": "asgard.demo.accounts", "messagesource": "Debezium CDC from MySQL on asgard"}
查看它的内容。按 Ctrl-C 取消PRINT
语句并返回命令行。
注意语句Format
的输出。PRINT
这是您的数据的序列化格式。
如果数据在 Avro 中序列化,那么您可以运行:
CREATE STREAM mydata WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');
如果它在 JSON 中,您还需要指定列名和数据类型
CREATE STREAM mydata (col1 INT, col2 VARCHAR) WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='JSON');
现在您已经使用 KSQL '注册'了这个主题,您可以使用以下命令查看它的架构DESCRIBE
:
ksql> DESCRIBE mydata;
Name : MYDATA
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCOUNT_ID | VARCHAR(STRING)
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
EMAIL | VARCHAR(STRING)
PHONE | VARCHAR(STRING)
ADDRESS | VARCHAR(STRING)
COUNTRY | VARCHAR(STRING)
CREATE_TS | VARCHAR(STRING)
UPDATE_TS | VARCHAR(STRING)
MESSAGETOPIC | VARCHAR(STRING)
MESSAGESOURCE | VARCHAR(STRING)
-------------------------------------------
然后使用KSQL查询和操作数据:
ksql> SET 'auto.offset.reset'='earliest';
ksql> SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';
Robin Moffatt | robin@confluent.io
Sidoney Lafranconi | slafranconi0@cbc.ca
Mick Edinburgh | medinburgh1@eepurl.com
Merrill Stroobant | mstroobant2@china.com.cn
按 Ctrl-C 取消SELECT
查询。
KSQL 可以将其持久化到新的 Kafka 主题:
CREATE STREAM UK_USERS AS SELECT FIRST_NAME + ' ' + LAST_NAME AS FULL_NAME, EMAIL FROM mydata WHERE COUNTRY='United Kingdom';
如果您再次列出您的 KSQL 主题,您将看到创建和填充的新主题:
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
asgard.demo.accounts | true | 1 | 1 | 2 | 2
UK_USERS | true | 4 | 1 | 0 | 0
---------------------------------------------------------------------------------------------------------
ksql>
进入源主题 ( asgard.demo.accounts
) 的每个事件都会被 KSQL 读取和过滤,然后UK_USERS
根据您执行的 SQL 写入目标主题 ()。
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作。