我是使用 apache flink 的 kinesis analytics studio 的新手,基本上我有一个包含数百条记录的数据流,我想做一些实时的基本分析,所以我完成了 这个教程并用胶水创建了一个表使用以下代码:
%flink.ssql(type=update)
CREATE TABLE active_users(
user_id varchar(120),
platform varchar(60),
event_time timestamp(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
PARTITIONED BY (user_id)
WITH (
'connector' = 'kinesis',
'stream' = 'stream-id',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');
并且表已成功创建,我可以在胶水数据目录中看到它,但是当我尝试使用基本选择语句查询表时
%flink.ssql(type=update)
SELECT * FROM active_users limit 10;
我得到以下错误
Unable to create a source for reading table 'hive.stream-id.active_users'.
Table options are:
'aws.region'='us-east-1'
'connector'='kinesis'
'format'='json'
'json.timestamp-format.standard'='ISO-8601'
'scan.stream.initpos'='LATEST'
'stream'='stream-id'
我查看了 IAM 政策,一切看起来都不错,关于如何解决这个问题有什么想法吗?