我正在构建一个 spark 应用程序来加载两个 json 文件,比较它们并打印差异。我也尝试使用 amazon library 验证这些文件aws deequ
,但出现以下异常:
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/08/07 11:56:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Failed to load com.deeq.CompareDataFrames: com/amazon/deequ/checks/Check
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please
当我提交作业时:
./spark-submit --class com.deeq.CompareDataFrames--master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
我正在使用 Ubuntu 来托管 spark,在我添加 deequ 以运行一些验证之前,它可以正常工作。我想知道我是否在部署过程中遗漏了一些东西。这个错误似乎不是互联网上众所周知的错误。
代码 :
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.Tuple2;
import scala.collection.mutable.ArraySeq;
import scala.collection.mutable.Seq;
public class CompareDataFrames {
public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("CompareDataFrames").getOrCreate();
session.sparkContext().setLogLevel("ALL");
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("CUST_ID", DataTypes.StringType, true),
DataTypes.createStructField("RECORD_LOCATOR_ID", DataTypes.StringType, true),
DataTypes.createStructField("EVNT_ID", DataTypes.StringType, true)
});
Dataset<Row> first = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV1.json");
System.out.println("======= DataSet 1 =======");
first.printSchema();
first.show(false);
Dataset<Row> second = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV2.json");
System.out.println("======= DataSet 2 =======");
second.printSchema();
second.show(false);
// This will show all the rows which are present in the first dataset
// but not present in the second dataset. But the comparison is at row
// level and not at column level.
System.out.println("======= Expect =======");
first.except(second).show();
StructType one = first.schema();
JavaPairRDD<String, Row> pair1 = first.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
JavaPairRDD<String, Row> pair2 = second.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
System.out.println("======= Pair1 & Pair2 were created =======");
JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);
JavaRDD<Row> rdd = subs.values();
Dataset<Row> diff = session.createDataFrame(rdd, one);
System.out.println("======= Diff Show =======");
diff.show();
Seq<Constraint> cons = new ArraySeq<>(0);
VerificationResult vr = new VerificationSuite().onData(first)
.addCheck(new Check(CheckLevel.Error(), "unit test", cons)
.isComplete("EVNT_ID", Option.empty())
)
.run();
Seq<Check> checkSeq = new ArraySeq<>(0);
if (vr.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vr.checkResultsAsDataFrame(session, vr, checkSeq);
vrr.show(false);
}
}
}
**马文:**
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>