我们有一个 JSON 文件作为 spark 程序的输入(它描述了我们要在每一列上检查的架构定义和约束),我还想执行一些数据质量检查,例如(非 NULL,唯一)和数据类型验证(想检查 csv 文件是否包含根据 json 模式的数据?)。
JSON文件:
{
"id":"1",
“姓名”:“员工”,
“来源”:“本地”,
“文件类型”:“文本”,
"sub_file_type":"csv",
“分隔符”:“,”,
"路径":"/user/all/dqdata/data/emp.txt",
“列”:[
{"column_name":"empid","datatype":"integer","constraints":["not null","unique"],"values_permitted":["1","2"]},
{"column_name":"empname","datatype":"string","constraints":["not null","unique"],"values_permitted":["1","2"]},
{"column_name":"salary","datatype":"double","constraints":["not null","unique"],"values_permitted":["1","2"]},
{"column_name":"doj","datatype":"date","constraints":["not null","unique"],"values_permitted":["1","2"]},
{"column_name":"location","string":"number","constraints":["not null","unique"],"values_permitted":["1","2"]}
]
}
CSV 输入示例:
empId,empname,salar,dob,位置
1,a,10000,11-03-2019,浦那
2,b,10020,14-03-2019,浦那
3,a,10010,15-03-2019,浦那
a,1,10010,15-03-2019,浦那
请记住,
1)我故意为empId和name字段输入了无效数据(检查最后一条记录)。2)json文件的列数不固定?
问题:
如何确保输入数据文件包含给定数据类型(JSON 格式)文件的所有记录?
我们尝试了以下事情:
1) 如果我们尝试通过应用外部模式使用数据框从 CSV 文件中加载数据,那么 spark 程序会立即抛出一些强制转换异常(NumberFormatException 等)并异常终止程序。但我想继续执行流程并将特定错误记录为“列 empID 的数据类型不匹配错误”。上面的场景只有在我们对数据框调用一些 RDD 操作时才有效,我觉得这是一种奇怪的验证模式的方法。
请指导我,我们如何在火花中实现它?