我正在使用的数据集有问题。它们是包含假新闻的 CSV。我的问题在于CSVRecordReader
课程,这是 DataVec (Deeplearning4j) 为我提供的。我正在尝试进行火花转换过程。我的问题是众所周知的“ CSV 行末尾未终止的引用字段”错误。
在网上搜索大家建议你查找出现这种情况的行,并在csv中修复问题,但这会非常困难,因为数据集包含文章的部分内容(可以是真假)。这些文章包含许多引号中的引用,以及其他典型的文章。
寻找解决方案我最终使用 Univocity csv 解析器库实现了我自己的 CSVRecordReader,它非常灵活并且解决了当前存在的所有问题CSVRecordReader
,但现在我发现了另一个困境,那就是这个库的解析器没有实现接口 Serializable
并在 Apache Spark 中运行转换会引发异常
org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException:com.univocity.parsers.csv.CsvParser
我该如何解决我的问题?
我自己的 CSVRecordReader 代码
package cu.desoft.cav.RecordReader;
import com.univocity.parsers.common.IterableResult;
import com.univocity.parsers.common.ParsingContext;
import com.univocity.parsers.common.ResultIterator;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import org.datavec.api.records.Record;
import org.datavec.api.records.metadata.RecordMetaData;
import org.datavec.api.records.metadata.RecordMetaDataLine;
import org.datavec.api.records.reader.impl.LineRecordReader;
import org.datavec.api.split.FileSplit;
import org.datavec.api.split.InputSplit;
import org.datavec.api.writable.Text;
import org.datavec.api.writable.Writable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* @author: Acosta email: yunielacost738@gmail.com
* created at: 11/25/2019
*/
public class UltraCSVRecordReader extends LineRecordReader {
public static final char DEFAULT_DELIMITER = ',';
public static final char DEFAULT_QUOTE = '"';
public static final char DEFAULT_QUOTE_ESCAPE = '"';
public static final char DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING = '\0';
private CsvParser csvParser;
private CsvParserSettings settings;
private ResultIterator<String[], ParsingContext> iterator;
public UltraCSVRecordReader() {
this(0, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}
/**
* @param unknownFormat if you can't know line endings, column delimiters and quotation characters set unknownFormat=true
* for automatic detection
*/
public UltraCSVRecordReader(boolean unknownFormat) {
this();
if (unknownFormat) {
settings = new CsvParserSettings();
settings.detectFormatAutomatically();
csvParser = new CsvParser(settings);
}
}
public UltraCSVRecordReader(CsvParserSettings settings) {
this.settings = settings;
csvParser = new CsvParser(settings);
}
/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
* @param quote (default "): value used for escaping values where the fields delimiter is part of
* the value (e.g. the value "a,b" is parse as a , b).
* @param quoteEscape (default "): value used for escaping the quote character inside an already escaped value
* (e.g. the value " "" a,b "" " is parse as " a , b ").
* @param charToEscapeQuoteEscaping (default \0): value used for escaping the quote escape character, when quote and quote escape are different
* (e.g. the value “\ " a , b " \” is parsed as \ " a , b " \, if quote = ", quoteEscape = \ and charToEscapeQuoteEscaping = \).
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape,
char charToEscapeQuoteEscaping) {
settings = new CsvParserSettings();
settings.getFormat().setDelimiter(delimiter);
settings.getFormat().setQuote(quote);
settings.getFormat().setQuoteEscape(quoteEscape);
settings.getFormat().setCharToEscapeQuoteEscaping(charToEscapeQuoteEscaping);
settings.setNumberOfRowsToSkip(skipNumLines);
csvParser = new CsvParser(settings);
}
/**
* @param skipNumLines number of lines to skip
*/
public UltraCSVRecordReader(long skipNumLines) {
this(skipNumLines, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}
/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter) {
this(skipNumLines, delimiter, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}
/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
* @param quote (default "): value used for escaping values where the fields delimiter is part of
* the value (e.g. the value "a,b" is parse as a , b).
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote) {
this(skipNumLines, delimiter, quote, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}
/**
* @param skipNumLines number of lines to skip
* @param delimiter (default ,): value used to separate individual fields in the input
* @param quote (default "): value used for escaping values where the fields delimiter is part of
* the value (e.g. the value "a,b" is parse as a , b).
* @param quoteEscape (default "): value used for escaping the quote character inside an already escaped value
* (e.g. the value " "" a,b "" " is parse as " a , b ").
*/
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape) {
this(skipNumLines, delimiter, quote, quoteEscape, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}
@Override
public void initialize(InputSplit split) throws IOException, InterruptedException {
super.initialize(split);
this.initialize(((FileSplit) split).getRootDir());
}
public UltraCSVRecordReader maxLengthCharactersToParser(int numberCharacters) {
this.settings.setMaxCharsPerColumn(numberCharacters);
this.csvParser = new CsvParser(this.settings);
return this;
}
public void initialize(File file) {
IterableResult<String[], ParsingContext> iterate = this.csvParser.iterate(file);
iterator = iterate.iterator();
}
protected List<Writable> parseLine(String line) {
String[] split;
split = this.csvParser.parseLine(line);
List<Writable> values = new ArrayList<>();
for (String value : split) {
values.add(new Text(value));
}
return values;
}
public List<List<Writable>> next(int num) {
List<List<Writable>> ret = new ArrayList<>(Math.min(num, 10000));
int count = 0;
while (this.hasNext() && count++ < num) {
ret.add(this.next());
}
return ret;
}
public List<Writable> next() {
String[] valuesSplit = iterator.next();
List<Writable> values = new ArrayList<>();
try {
for (String value : valuesSplit) {
values.add(new Text(value));
}
} catch (NullPointerException ex) {
ex.printStackTrace();
System.out.println(values);
System.out.println("================================");
System.out.println(Arrays.toString(valuesSplit));
}
return values;
}
public boolean batchesSupported() {
return true;
}
public boolean hasNext() {
return iterator.hasNext();
}
public Record nextRecord() {
List<Writable> next = this.next();
URI uri = this.locations != null && this.locations.length >= 1 ? this.locations[this.splitIndex] : null;
RecordMetaData meta = new RecordMetaDataLine(this.lineIndex - 1, uri, UltraCSVRecordReader.class);
return new org.datavec.api.records.impl.Record(next, meta);
}
public Record loadFromMetaData(RecordMetaData recordMetaData) throws IOException {
return this.loadFromMetaData(Collections.singletonList(recordMetaData)).get(0);
}
public List<Record> loadFromMetaData(List<RecordMetaData> recordMetaDatas) throws IOException {
List<Record> list = super.loadFromMetaData(recordMetaDatas);
for (Record r : list) {
String line = r.getRecord().get(0).toString();
r.setRecord(this.parseLine(line));
}
return list;
}
public void reset() {
super.reset();
}
public CsvParser getCsvParser() {
return csvParser;
}
}
示例数据集
为什么要花两年时间才能摧毁它们?我们又来了……另一组从政府和纳税人那里偷东西!一群索马里人在短短 10 个月内偷走了超过 400 万份政府福利!我们已经报道了许多类似这样的案例,其中穆斯林难民/移民通过欺骗我们的系统进行欺诈......这是失控的!更多相关","english","2016-10-27T01:49:27.168+03:00","100percentfedup.com","US",25689,"Muslims BUSTED: 他们偷走了数百万的政府福利", 0"http://bb4sp.com/wp-content/uploads/2016/10/Fullscreen-capture-10262016-83501-AM.bmp.jpg ",0,1,0,0,0,"bias"
这是我的改造过程
package cu.desoft.cav.preprocessing;
import cu.desoft.cav.RecordReader.UltraCSVRecordReader;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import org.datavec.api.transform.TransformProcess;
import org.datavec.api.transform.schema.Schema;
import org.datavec.api.writable.Writable;
import org.datavec.spark.transform.SparkTransformExecutor;
import org.datavec.spark.transform.misc.StringToWritablesFunction;
import org.datavec.spark.transform.misc.WritablesToStringFunction;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
* author: acosta
* email: yunielacosta738@gmail.com
* Created on: 2/3/20
*/
public class FakeNewsTransformation {
private final String DATSETS_PATH = "data/FakeNews/";
public void transform(boolean useSparkLocal) {
Schema schema = new Schema.Builder()
.addColumnString("uuid")
.addColumnInteger("ord_in_thread")
.addColumnString("author")
.addColumnString("published")
.addColumnsString("title","text","language","crawled","site_url","country")
.addColumnInteger("domain_rank")
.addColumnString("thread_title")
.addColumnsInteger("spam_score","main_img_url","replies_count","participants_count","likes","comments","shares")
.addColumnCategorical("type", Arrays.asList("bias", "bs","conspiracy","fake","hate","junksci","satire","state"))
.build();
TransformProcess tp = new TransformProcess.Builder(schema)
.removeColumns("uuid", "ord_in_thread","author","published","site_url","country","thread_title")
.categoricalToInteger("type")
.build();
int numActions = tp.getActionList().size();
for (int i = 0; i < numActions; i++) {
System.out.println("\n\n===============================");
System.out.println("--- Schema after step " + i +
" (" + tp.getActionList().get(i) + ")--");
System.out.println(tp.getSchemaAfterStep(i));
}
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator");
if (useSparkLocal) {
sparkConf.setMaster("local[*]");
}
sparkConf.setAppName("Fake News Spanish Corpus dataset transformation");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//Load our data using Spark
JavaRDD<String> lines = sc.textFile(DATSETS_PATH + "fake.csv");
int skipNumLines = 1;
//We first need to parse this format. It's comma-delimited (CSV) format, so let's parse it using CSVRecordReader:
RecordReader rr = new UltraCSVRecordReader();
// RecordReader rr = new CSVRecordReader();
JavaRDD<List<Writable>> parsedInputData = lines.map(new StringToWritablesFunction(rr));
//Now, let's execute the transforms we defined earlier:
JavaRDD<List<Writable>> processedData = SparkTransformExecutor.execute(parsedInputData, tp);
//For the sake of this example, let's collect the data locally and print it:
JavaRDD<String> processedAsString = processedData.map(new WritablesToStringFunction(","));
System.out.println("<<<<<<<<<<<<<<<PATH>>>>>>>>>>>>>");
File dataset = new File("dataset/FakeNews");
if (dataset.exists()) {
try {
FileUtils.deleteDirectory(dataset);
System.out.println("DELETE THE DIRECTORY");
} catch (IOException e) {
System.out.println("The directory was not delete");
e.printStackTrace();
}
}
System.out.println(dataset.getAbsolutePath());
System.out.println("<<<<<<<<<<<<<<<END-PATH>>>>>>>>>>>>>");
processedAsString.saveAsTextFile("file://" + dataset.getAbsolutePath()); //To save locally
//processedAsString.saveAsTextFile("hdfs://your/hdfs/save/path/here"); //To save to hdfs
List<String> processedCollected = processedAsString.collect();
List<String> inputDataCollected = lines.collect();
}
public static void main(String[] args) {
new FakeNewsTransformation().transform(true);
}
}
这是我使用 CSVRecordReader (DataVec) 时的输出错误
java.lang.RuntimeException: java.io.IOException: Un-terminated quoted field at end of CSV line
at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:183)
at org.datavec.api.records.reader.impl.csv.CSVRecordReader.next(CSVRecordReader.java:175)
at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:41)
at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:33)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1211)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1218)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Un-terminated quoted field at end of CSV line
at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:276)
at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:186)
at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:181)
... 21 more
这就是当我使用我自己的 CSVRecordReader 和 univocity csv 解析器时的序列化问题(这个库不实现可序列化)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
at cu.desoft.cav.preprocessing.FakeNewsTransformation.transform(FakeNewsTransformation.java:71)
at cu.desoft.cav.preprocessing.FakeNewsTransformation.main(FakeNewsTransformation.java:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.io.NotSerializableException: com.univocity.parsers.csv.CsvParser
Serialization stack:
- object not serializable (class: com.univocity.parsers.csv.CsvParser, value: com.univocity.parsers.csv.CsvParser@75b6dd5b)
- field (class: cu.desoft.cav.RecordReader.UltraCSVRecordReader, name: csvParser, type: class com.univocity.parsers.csv.CsvParser)
- object (class cu.desoft.cav.RecordReader.UltraCSVRecordReader, cu.desoft.cav.RecordReader.UltraCSVRecordReader@1fedf0a4)
- field (class: org.datavec.spark.transform.misc.StringToWritablesFunction, name: recordReader, type: interface org.datavec.api.records.reader.RecordReader)
- object (class org.datavec.spark.transform.misc.StringToWritablesFunction, org.datavec.spark.transform.misc.StringToWritablesFunction@465b38e6)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 18 more