3

我正在使用 Kinesis firehose 通过 S3 将数据传输到 Redshift。我有一个非常简单的 csv 文件,看起来像这样。firehose 将其放入 s3,但 Redshift 出错,并出现 Delimiter not found 错误。我已经查看了与此错误相关的所有帖子,但我确保包含分隔符。

文件

GOOG,2017-03-16T16:00:01Z,2017-03-17 06:23:56.986397,848.78
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:24:02.061263,848.78
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:24:07.143044,848.78
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:24:12.217930,848.78

或者

"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:48:59.993260","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:07.034945","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:12.306484","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:18.020833","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:24.203464","852.12"

红移表

CREATE TABLE stockvalue
( symbol                   VARCHAR(4),
  streamdate               VARCHAR(20),
  writedate                VARCHAR(26),
  stockprice               VARCHAR(6)
);
  • 错误 错误

  • 以防万一,这就是我的运动流看起来像 Firehose

有人可以指出文件可能有什么问题。我在字段之间添加了一个逗号。目标表中的所有列都是 varchar,因此应该没有数据类型错误的原因。此外,列长度在文件和红移表之间完全匹配。我已经尝试在双引号中嵌入列并且没有。

4

3 回答 3

6

您可以发布完整的 COPY 命令吗?它在屏幕截图中被切断。

我的猜测是你DELIMITER ','在你的 COPY 命令中丢失了。尝试将其添加到 COPY 命令中。

于 2017-03-19T21:57:38.770 回答
3

我被困了几个小时,感谢沙希德的回答,它帮助我解决了这个问题。

列名的文本大小写很重要

Redshift 将始终将表的列视为小写,因此在将 JSON 键映射到列时,请确保 JSON 键为小写,例如

您的 JSON 文件将如下所示:

{'id': 'val1', 'name': 'val2'}{'id': 'val1', 'name': 'val2'}{'id': 'val1', 'name': 'val2'}{'id': 'val1', 'name': 'val2'}

COPY 语句看起来像

COPY latency(id,name) FROM 's3://<bucket-name>/<manifest>' CREDENTIALS 'aws_iam_role=arn:aws:iam::<aws-account-id>:role/<role-name>' MANIFEST json 'auto';

Firehose 中的设置必须指定列名(同样是小写)。此外,将以下内容添加到 Firehose COPY 选项:

json 'auto' TRUNCATECOLUMNS blanksasnull emptyasnull

如何从 Python 调用 put_records:

下面是一个片段,展示了如何在 python 中将 put_records 函数与 kinesis 一起使用:

传递给“put_to_stream”函数的“对象”是一个字典数组:

def put_to_stream(objects):
    records = []

    for metric in metrics:
        record = {
            'Data': json.dumps(metric),
            'PartitionKey': 'swat_report'
        };

        records.append(record)

    print(records)

    put_response = kinesis_client.put_records(StreamName=kinesis_stream_name, Records=records)

flush
``
于 2019-04-04T21:18:46.517 回答
1

1-您需要添加 FORMAT AS JSON 's3://yourbucketname/aJsonPathFile.txt'。AWS 还没有提到这一点。请注意,这仅在您的数据为 JSON 格式时才有效,例如

{'attr1': 'val1', 'attr2': 'val2'} {'attr1': 'val1', 'attr2': 'val2'} {'attr1': 'val1', 'attr2': 'val2' } {'attr1':'val1','attr2':'val2'}

2-您还需要验证 kinesis firehouse 和 csv 文件中的列顺序。然后尝试添加

TRUNCATECOLUMNS 空白sasnull 空asnull

3- 一个例子

复制 testrbl3 ( eventId,serverTime,pageName,action,ip,userAgent,location,plateform,language,campaign,content,source,medium,productID,colorCode,scrolltoppercentage) FROM 's3://bucketname/' CREDENTIALS 'aws_iam_role=arn: aws:iam:::role/' MANIFEST json 'auto' TRUNCATECOLUMNS blanksasnull emptyasnull;

于 2018-09-07T07:55:17.623 回答