我一直在使用 KSQL,到目前为止它运行良好。但现在我想通过 Kafka Connect 将输出下沉到 BigQuery,并且需要附加一个 JSON 模式。我很难弄清楚如何做到这一点。这是我的查询:
CREATE STREAM tweets_original (
CreatedAt BIGINT,
Id BIGINT,
Text VARCHAR,
Source VARCHAR,
GeoLocation VARCHAR,
User STRUCT<Id BIGINT, Name VARCHAR, Description VARCHAR, ScreenName VARCHAR, URL VARCHAR, FollowersCount BIGINT, FriendsCount BIGINT>
)
WITH (kafka_topic='tweets', value_format='JSON');
CREATE STREAM tweets_new
WITH (kafka_topic='tweets-new') AS
SELECT
CreatedAt as created_at,
Id as tweet_id,
Text as tweet_text,
Source as source,
GeoLocation as geo_location,
User->Id as user_id,
User->Name as user_name,
User->Description as user_description,
User->ScreenName as user_screenname
FROM tweets_original ;
这是写入输出主题 ( tweets-new
) 的记录示例。
{
"CREATED_AT": 1535036410000,
"TWEET_ID": 1032643668614819800,
"TWEET_TEXT": "Sample text",
"SOURCE": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"GEO_LOCATION": null,
"USER_ID": 123,
"USER_NAME": "John Smith",
"USER_DESCRIPTION": "Developer in Chief",
"USER_SCREENNAME": "newphonewhodis"
}
但是,为了让 Kafka Connect 将这些记录接收到 BigQuery,我需要附加一个架构,如下所示:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "CREATED_AT"
},
{
"type": "int64",
"optional": false,
"field": "TWEET_ID"
},
{
"type": "string",
"optional": false,
"field": "TWEET_TEXT"
}
...
],
"optional": false,
"name": "foobar"
},
"payload": {...}
}
无论如何,我在文档中没有看到任何东西表明我可以如何解决这个问题(也许我找错了地方)。任何帮助将不胜感激!