2

I'm trying to load all new files from an AWS S3 bucket depending on its paths to two Snowflake tables, but I couldn't yet succeed even with one table. What I've tried:

Created a stage:

CREATE or replace STAGE DATA_SCIENCE.INFRA.jobs_resource_usage URL = 's3://om/jobs-resource-usage/'
  storage_integration = om_s3 FILE_FORMAT=(TYPE='JSON');

Created a table:

create or replace TABLE DATA_SCIENCE.INFRA.job_metrics (
  job_name STRING,
  build_number INT,
  cpu_perc INT,
  mem BIGINT,
  "timestamp" TIMESTAMP
  );

Created a pipe:

create or replace pipe DATA_SCIENCE.INFRA.job_metrics auto_ingest=true as
    copy into DATA_SCIENCE.INFRA.job_metrics
        from (select
            REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
            REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
            $1:cpu_perc::INT,
            $1:mem::BIGINT,
            $1:timestamp::TIMESTAMP
        from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/);

Added the SQS ARN to the bucket's event with:

  • prefix: jobs_resource_usage/
  • suffix: .json
  • send to: SQS Queue
  • SQS queue ARN: the one which select parse_json(SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics')):notificationChannelName; has returned

The stage works, because I can list files with like:

ls '@DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/job_name=Ingest job';

Which returns S3 file names like (included a sample to see its format):

s3://om/jobs-resource-usage/metrics/job_name=Ingest job/build_number=144.json

I can successfully load the file manually with:

copy into DATA_SCIENCE.INFRA.job_metrics
    from (select
          REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
          REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
        $1:cpu_perc::INT,
        $1:mem::BIGINT,
        $1:timestamp::TIMESTAMP
        from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/)
        files=('job_name=Ingest job/build_number=144.json');

However, the pipe doesn't load anything. If I do a

select SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics');

I can see it receives the notification messages:

{"executionState":"RUNNING","pendingFileCount":0,"notificationChannelName":"arn:aws:sqs:us-west-2:494544507972:sf-snowpipe-concealed","numOutstandingMessagesOnChannel":7,"lastReceivedMessageTimestamp":"2020-08-13T09:59:21.107Z"}

but I can't see any lastForwardedMessageTimestamp entries, which suggests there's a problem with path matching? I've tried multiple permutations with the leading slash and to upload files right to the metrics path, without any spaces or =s, without success.

What did I do wrong, how could I figure out what's the problem here?

4

2 回答 2

1

Review what stages you have pointing to your S3 buckets. Having multiple stages at different levels of granularity can cause reading conflicts of the message queues. If the pipe is working correctly and seeing the messages, you will see a lastForwardedMessageTimestamp like you mentioned. If you don't see that, you either don't have any messages in your queue, or the pipe is not reading the queue correctly, or there is a conflict and something else is reading the queue messages first. Do you have access to check your SQS queue logs to make sure messages are showing up in the first place and that your queue is working? If your queue is working correctly, I would double check you have permissions to the queue set correctly and that you don't have multiple stages conflicting on your integration and queue.

于 2020-08-13T15:00:08.413 回答
0

Looks like you might be missing file_format = (type = 'JSON') from the end of your pipe creation statement

Also, according to the docs, you might need to set aws_sns_topic='<sns_topic_arn>' in the pipe definition also

于 2020-08-13T16:12:16.363 回答