问题标签 [matillion]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
734 浏览

python - 数据摄取:将动态文件从 S3 加载到 Snowflake

情况:每个月都有一个 csv 进入 AWS S3。供应商可以随意添加/删除/修改文件中的列。所以架构是不提前知道的。要求是在 Snowflake 中动态创建一个表并将数据加载到所述表中。Matillion 是我们的 ELT 工具。

这是我到目前为止所做的。

  1. 设置一个 Lambda 来检测文件的到达,将其转换为 JSON,上传到另一个 S3 目录并将文件名添加到 SQS。
  2. Matillion 检测 SQS 消息并将带有 JSON 数据的文件加载到 SF 表的 Variant 列中。
  3. SF Stored proc 采用变量列并根据 JSON 数据中的字段数生成一个表。SF 中的 VARIANT 列仅在其 JSON 数据时以这种方式工作。遗憾的是不支持 CSV。

这适用于 10,000 行。当我使用超过 1GB(超过 10M 行)的完整文件运行它时,就会出现问题。它在运行时因磁盘空间不足错误而使 lambda 作业崩溃。

这些是我到目前为止想到的替代方案:

  1. 将 EFS 卷附加到 lambda,并在上传到 S3 之前使用它来存储 JSON 文件。JSON 数据文件比它们的 CSV 对应文件大得多,我预计 json 文件大约为 10-20GB,因为该文件有超过 10M 行。
  2. Matillion 有一个 Excel 查询组件,它可以在其中获取标题并动态创建表格并加载文件。我在想我可以将 CSV 中的标题行转换为 Lambda 中的 XLX 文件,将其传递给 Matillion,让它创建结构,然后在创建结构后加载 csv 文件。

我在这里还有哪些其他选择?考虑因素包括用于未来大型 CSV 或类似需求的良好可重复设计模式、EFS 成本、我是否充分利用了可用的工具?谢谢!!!

0 投票
0 回答
28 浏览

amazon-web-services - 如何修复 Redshift db 中的历史表?

我在更新数据库中的历史表时遇到了问题。我在 ETL 工作中有 3 个步骤:

  1. 检测更改 - Matillion ETL 发现两个表之间的差异并将其写入 tmp 表
  2. 关闭当前 - 此步骤将 current_flag = 'Y' 的所有行更新为 'N' 并将 end_date 设置为 dateadd(day,-1,current_date)
  3. 插入行 - 步骤 1 中 tmp 表中的所有行(仅过滤带有 N 和 C 指示符的行)被附加到历史表中,其中 start_date 等于 dateadd(day,-1,current_date) 并且结束日期等于 '2099-01 -01'

发生的情况是,步骤 2 和 3 是使用昨天的数据(来自 tmp 表)错误地手动执行的,并且在 15 分钟后,所有 3 个步骤的作业再次按照预期使用新数据执行。我有不同的 update_timestamps(2021-03-19 01:59:02 和 2021-03-19 02:11:57)可用于解决此问题。但是我在修复手动更新的数据时遇到了困难(start_date='2021-03-18' and end_date='2021-03-18' and current_flag='N'),我相信这些应该再次更新到 start_date ='2021-03-18' and end_date='2099-01-01' and current_flag='Y') 但我不确定如何处理新的作业数据(较新的时间戳 - 正确执行),因为它已完成错误的数据。我是否应该在修复 start_date='2021-03-18' 和 end_date=' 后删除它并重新运行 2021-03-18' 和 current_flag='N' 手动?另外,我觉得我在这里缺少一些步骤?

谢谢

0 投票
1 回答
48 浏览

amazon-web-services - S3 存储桶文件前缀

我在 S3 存储桶中有文件名,就像

abc_Assessment_20210302.csv xyz_Assessment_20210302.csv mno_Assessment_20210302.csv

其中关键字是评估。

作为一个变量,我通过 * Assessment * 将所有与评估相关的文件查找到 S3 存储桶中,以便处理到暂存表中。

但是 s3_file_file_prefix as * Assessment * 在这里不起作用,并且无法在 matillion 工作中列出这些与评估相关的文件。

需要帮忙。

0 投票
1 回答
177 浏览

sql - Matillion Grid Iterator 运行随机作业顺序

我在类似于以下内容的“查询结果到网格”组件中有 SQL:-

这些值保存在一个名为“BATCHS”的网格变量中,尽管正确应用了“ORDER BY”,但当作业运行时它是随机顺序的:我希望批次 1 到 10 按顺序运行,但它们是以 5,3,6,10,1 等混乱的顺序运行。

为了纠正这个问题,我在“Query Result to Grid”组件和“Grid Iterator”组件之间添加了一个 Python 组件,该组件使用以下代码:-

这解决了问题,但它不应该首先发生。

有没有其他人遇到过这个问题,你能建议根本问题是什么吗?

0 投票
1 回答
379 浏览

matillion - Matillion API 配置文件配置为发布请求

我对 Matillion 完全陌生,我不确定如何为 POST 请求管理 Matillion API 配置文件。

我有一个编排和转换工作,它从源中提取数据并通过转换执行业务逻辑并将数据加载到目标表中。

将数据加载到目标表(Snowflake DB)中后,我需要从中提取数据并进行 API POST 调用,以便以 JSON 格式向第三方供应商获取数据。

上面的命令行参数将通过 POST 获取数据。请注意,“is_ingestion_finished”值需要设置为 false,直到从目标表中检索到最后一块数据,并且在最后一块时,它应该设置为 true。对于第一个块,“job_id”应设置为空。处理完第一个块后,我们将在 API 响应中获取作业 ID,并且需要将其用于其余块。

我浏览了 Matillion 文档,似乎 API Query 组件用于通过 API 发出 GET 请求并将其存储在表中。

不确定如何通过发出 API 发布请求从目标表中发布数据,另外我需要分页逻辑来发布请求,因为数据集很大并且需要分块发送结果集。

我目前使用的 Matillion 版本是 1.46.5(build 2)。任何有关配置 API 配置文件或任何示例 rsd 脚本或 python 脚本的步骤的帮助将不胜感激。谢谢!

0 投票
1 回答
84 浏览

etl - Matillion S3 加载组件问题

我正在使用 Matillion ETL 工具通过 S3 加载组件将数据从 S3 存储桶加载到登陆表。

我有如下一条记录 000D3A8B328E|"Rila Borovets" AD||83634A3C|DDFS

归档分隔符是 | 在加载此记录时,我收到以下错误“错误代码:1214 分隔值缺少结束引号”

我想加载来自源的值而不删除双引号。我有数百万条记录,很少有双引号,很少有没有。我只对那些带有双引号的记录有问题。

我该如何处理这种情况。?

0 投票
1 回答
270 浏览

python - Matillion 网格变量未通过 updateGridVariable 更新

我在作业中定义了一个 GridVariable(使用管理网格变量)。然后我有一个创建数据框的python代码。然后我将数据框的内容附加到一个数组中,然后更新我的网格变量。但是,当我将网格变量用于转换作业时,内容不会更新。

这是我的代码片段

print(len(arr2)) 行打印正确的记录数 print(arr2) 正确打印数组。

但是当我在转换中使用网格变量时,它不会在 python 脚本中加载记录。

0 投票
2 回答
297 浏览

snowflake-cloud-data-platform - 雪花检查模式是否存在

雪花不接受 T-sql 但我正在尝试创建模式如果它不存在。

如何在雪花中做到这一点?

0 投票
1 回答
62 浏览

marketo - 使用带有 Marketo 活动批量提取 API 的过滤器类型 UpdatedAt

我们正在尝试使用 Marketo Activity Bulk Extract API 来获取特定日期范围内的活动。但是,似乎没有可用于此 API 的高水位标记(updatedAt、lastModifiedDate 等)过滤器。

请指教

0 投票
1 回答
43 浏览

marketo - API 中缺少 Marketo 高水位线和字段定义元数据

Marketo 活动对象 API 缺少高水位线/审计字段。使用 UpdatedAt 或 ModifedDate 之类的列来识别更改记录的潜在方法是什么。没有任何活动对象具有此审核列。考虑到 API 限制,识别 delta 的最佳方法是什么

另外 Marketo 中是否有任何特定的对象用于字段元数据,可以根据每个对象过滤器动态检索,以使编排动态化。基本上在 marketo 中寻找一个可以用作数据源的系统表,每个 Marketo 对象都可以从网格变量传递以将字段名称存储到一个单独的网格变量中以供以后使用

以前有人遇到过这种情况吗?请分享你的经验