I'm trying to load all new files from an AWS S3 bucket depending on its paths to two Snowflake tables, but I couldn't yet succeed even with one table. What I've tried:
Created a stage:
CREATE or replace STAGE DATA_SCIENCE.INFRA.jobs_resource_usage URL = 's3://om/jobs-resource-usage/'
storage_integration = om_s3 FILE_FORMAT=(TYPE='JSON');
Created a table:
create or replace TABLE DATA_SCIENCE.INFRA.job_metrics (
job_name STRING,
build_number INT,
cpu_perc INT,
mem BIGINT,
"timestamp" TIMESTAMP
);
Created a pipe:
create or replace pipe DATA_SCIENCE.INFRA.job_metrics auto_ingest=true as
copy into DATA_SCIENCE.INFRA.job_metrics
from (select
REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
$1:cpu_perc::INT,
$1:mem::BIGINT,
$1:timestamp::TIMESTAMP
from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/);
Added the SQS ARN to the bucket's event with:
- prefix: jobs_resource_usage/
- suffix: .json
- send to: SQS Queue
- SQS queue ARN: the one which
select parse_json(SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics')):notificationChannelName;
has returned
The stage works, because I can list files with like:
ls '@DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/job_name=Ingest job';
Which returns S3 file names like (included a sample to see its format):
s3://om/jobs-resource-usage/metrics/job_name=Ingest job/build_number=144.json
I can successfully load the file manually with:
copy into DATA_SCIENCE.INFRA.job_metrics
from (select
REGEXP_SUBSTR(METADATA$FILENAME,'job_name=(.*)/',1, 1, 'e', 1),
REGEXP_SUBSTR(METADATA$FILENAME,'build_number=([0-9]+)',1, 1, 'e', 1),
$1:cpu_perc::INT,
$1:mem::BIGINT,
$1:timestamp::TIMESTAMP
from @DATA_SCIENCE.INFRA.jobs_resource_usage/metrics/)
files=('job_name=Ingest job/build_number=144.json');
However, the pipe doesn't load anything. If I do a
select SYSTEM$PIPE_STATUS('DATA_SCIENCE.INFRA.job_metrics');
I can see it receives the notification messages:
{"executionState":"RUNNING","pendingFileCount":0,"notificationChannelName":"arn:aws:sqs:us-west-2:494544507972:sf-snowpipe-concealed","numOutstandingMessagesOnChannel":7,"lastReceivedMessageTimestamp":"2020-08-13T09:59:21.107Z"}
but I can't see any lastForwardedMessageTimestamp
entries, which suggests there's a problem with path matching?
I've tried multiple permutations with the leading slash and to upload files right to the metrics
path, without any spaces or =
s, without success.
What did I do wrong, how could I figure out what's the problem here?