我们有一个用例引导我写这篇文章,我相信你们中的许多人都会遇到这种情况。情况是通过单个 Talend 作业将多个集合从 MongoDB 迁移到 Snowflake 数据库,并将集合的顶级节点保留为 Snowflake 表中的单个字段。
现在我们知道 Talend 不支持 MongoDB 源的动态模式,因为 MongoDB 集合不强制使用模式,这意味着我们必须为我们想要摄取的每个现有/新集合创建单独的作业/子作业必须重新设计工作以应对文档中的未来更改,同时确保其始终有效,因此我们必须寻找替代解决方案。
这是方法,
第一步:从 MongoDB 集合中获取所有顶级键及其类型。我们使用了$objectToArarray的聚合来将所有顶部键和值对转换为文档数组,然后使用$unwind和$group以及$addToSet来获取整个集合中不同的键和值类型。
{
"_id" : "1",
"keys" : [
"field1~string",
"field2~object",
"filed3~date",
"_id~objectId"
]
}
第二步:在 Mongo Datatype 和 Snowflake Datatype 之间创建一对一的映射。我们创建了一个名为“ dataTypes ”的哈希映射来存储这些信息。或者,此信息可以存储在表格或文件等中。
java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
dataTypes.put("string","VARCHAR");
dataTypes.put("int","NUMBER");
dataTypes.put("objectId","VARCHAR");
dataTypes.put("object","VARIANT");
dataTypes.put("date","TIMESTAMP_LTZ");
dataTypes.put("array","VARCHAR");
dataTypes.put("bool","BOOLEAN");
第三步:将键与雪花进行比较:首先我们查询雪花INFORMATION_SCHEMA表是否存在,如果不存在则创建表,如果存在则检查文档中字段的更改并添加或修改雪花表中的那些列。通过使用第二步中的“数据类型映射”并迭代第一步中的键来生成 DDL 脚本
第四步:使用mongoexport命令将数据从 MongoDB 卸载到本地文件系统:
mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>
这是从第一步中的键准备的。
第五步:使用Snowsql的PUT命令将 .csv 文件从本地文件系统暂存到雪花暂存位置。
snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG -q 'put <fileName> @<internalStage> OVERWRITE=TRUE';
第六步:将数据从暂存位置加载到雪花表
COPY INTO <tableName> FROM @<internalStage>
[file_format=<fileFormat>] [pattern=<regex_pattern>]
在这里指定 file_format 和模式是可选的,我们使用了正则表达式,因为我们在一个雪花阶段为每个集合暂存多个文件。
第七步:维护一个集合列表,该列表可以放在本地文件系统的文件中或数据库表中,在 Talend 作业中迭代集合列表,并通过参数化集合名称、表名称,通过上述步骤处理每个集合,文件名和暂存名称等在作业中。