1

我是使用 apache flink 的 kinesis analytics studio 的新手,基本上我有一个包含数百条记录的数据流,我想做一些实时的基本分析,所以我完成了 这个教程并用胶水创建了一个表使用以下代码:

%flink.ssql(type=update)
CREATE TABLE active_users(
user_id varchar(120),
platform varchar(60),
event_time timestamp(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
 )
 PARTITIONED BY (user_id)
 WITH (
'connector' = 'kinesis',
'stream' = 'stream-id',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');

并且表已成功创建,我可以在胶水数据目录中看到它,但是当我尝试使用基本选择语句查询表时

%flink.ssql(type=update)
SELECT * FROM active_users limit 10;

我得到以下错误

Unable to create a source for reading table 'hive.stream-id.active_users'.

Table options are:

'aws.region'='us-east-1'
'connector'='kinesis'
'format'='json'
'json.timestamp-format.standard'='ISO-8601'
'scan.stream.initpos'='LATEST'
'stream'='stream-id'

我查看了 IAM 政策,一切看起来都不错,关于如何解决这个问题有什么想法吗?

4

1 回答 1

0

当我遇到这个问题时,我尝试了两件事来解决这个问题。

  1. flink 的版本。我将其更改为 1.11 和

  2. IAM 策略必须手动添加:

     {
         "Sid": "VisualEditor00",
         "Effect": "Allow",
         "Action": [
             "kinesis:DescribeStream",
             "kinesis:PutRecord",
             "kinesis:PutRecords",
             "kinesis:GetShardIterator",
             "kinesis:GetRecords",
             "kinesis:ListShards",
             "kinesis:DescribeStreamSummary",
             "kinesis:RegisterStreamConsumer"
         ],
         "Resource": [
             "arn:aws:kinesis:eu-west-1:376596651109:stream/edwarinputstream"
         ]
     },
    

一旦我完成了上述操作,它就会正常运行。

于 2022-01-28T13:50:19.663 回答