1

我们有一个 JSON 文件作为 spark 程序的输入(它描述了我们要在每一列上检查的架构定义和约束),我还想执行一些数据质量检查,例如(非 NULL,唯一)和数据类型验证(想检查 csv 文件是否包含根据 json 模式的数据?)。

JSON文件:

{

"id":"1",

“姓名”:“员工”,

“来源”:“本地”,

“文件类型”:“文本”,

"sub_file_type":"csv",

“分隔符”:“,”,

"路径":"/user/all/dqdata/data/emp.txt",

“列”:[

{"column_name":"empid","datatype":"integer","constraints":["not null","unique"],"values_permitted":["1","2"]},

{"column_name":"empname","datatype":"string","constraints":["not null","unique"],"values_permitted":["1","2"]},

{"column_name":"salary","datatype":"double","constraints":["not null","unique"],"values_permitted":["1","2"]},

{"column_name":"doj","datatype":"date","constraints":["not null","unique"],"values_permitted":["1","2"]},

{"column_name":"location","string":"number","constraints":["not null","unique"],"values_permitted":["1","2"]}

]

}

CSV 输入示例:

empId,empname,salar,dob,位置

1,a,10000,11-03-2019,浦那

2,b,10020,14-03-2019,浦那

3,a,10010,15-03-2019,浦那

a,1,10010,15-03-2019,浦那

请记住,

1)我故意为empId和name字段输入了无效数据(检查最后一条记录)。2)json文件的列数不固定?

问题:

如何确保输入数据文件包含给定数据类型(JSON 格式)文件的所有记录?

我们尝试了以下事情:

1) 如果我们尝试通过应用外部模式使用数据框从 CSV 文件中加载数据,那么 spark 程序会立即抛出一些强制转换异常(NumberFormatException 等)并异常终止程序。但我想继续执行流程并将特定错误记录为“列 empID 的数据类型不匹配错误”。上面的场景只有在我们对数据框调用一些 RDD 操作时才有效,我觉得这是一种奇怪的验证模式的方法。

请指导我,我们如何在火花中实现它?

4

1 回答 1

0

我认为这里没有免费的午餐,你必须自己编写这个过程,但你可以做的过程是......

  1. 读取 csv 文件DatasetStrings以便每一行都很好
  2. 使用函数解析数据集map以检查Null每列的数据类型问题
  3. 添加额外的两列,一个boolean叫做likevalidRow和一个String叫做like messageordescription
  4. 使用“2.”中提到的解析器,对每列中的每个值执行某种try/catch或 a并捕获异常并相应地设置和列Try/Success/FailurevalidRowdescription
  5. 做一个过滤器,将一个DataFrame/DataSet成功的(validRow标志设置为True)写入成功的地方,并将错误写入DataFrame/DataSet错误的地方
于 2019-06-05T13:47:34.053 回答