1

我正在使用的数据集有问题。它们是包含假新闻的 CSV。我的问题在于CSVRecordReader课程,这是 DataVec (Deeplearning4j) 为我提供的。我正在尝试进行火花转换过程。我的问题是众所周知的“ CSV 行末尾未终止的引用字段”错误。

在网上搜索大家建议你查找出现这种情况的行,并在csv中修复问题,但这会非常困难,因为数据集包含文章的部分内容(可以是真假)。这些文章包含许多引号中的引用,以及其他典型的文章。

寻找解决方案我最终使用 Univocity csv 解析器库实现了我自己的 CSVRecordReader,它非常灵活并且解决了当前存在的所有问题CSVRecordReader,但现在我发现了另一个困境,那就是这个库的解析器没有实现接口 Serializable 并在 Apache Spark 中运行转换会引发异常

org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException:com.univocity.parsers.csv.CsvParser

我该如何解决我的问题?

在此处输入图像描述

我自己的 CSVRecordReader 代码

package cu.desoft.cav.RecordReader;

import com.univocity.parsers.common.IterableResult;
import com.univocity.parsers.common.ParsingContext;
import com.univocity.parsers.common.ResultIterator;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import org.datavec.api.records.Record;
import org.datavec.api.records.metadata.RecordMetaData;
import org.datavec.api.records.metadata.RecordMetaDataLine;
import org.datavec.api.records.reader.impl.LineRecordReader;
import org.datavec.api.split.FileSplit;
import org.datavec.api.split.InputSplit;
import org.datavec.api.writable.Text;
import org.datavec.api.writable.Writable;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * @author: Acosta email: yunielacost738@gmail.com
 * created at: 11/25/2019
 */

public class UltraCSVRecordReader extends LineRecordReader {
    public static final char DEFAULT_DELIMITER = ',';
    public static final char DEFAULT_QUOTE = '"';
    public static final char DEFAULT_QUOTE_ESCAPE = '"';
    public static final char DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING = '\0';
    private CsvParser csvParser;
    private CsvParserSettings settings;
    private ResultIterator<String[], ParsingContext> iterator;
    public UltraCSVRecordReader() {
        this(0, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
    }

/**
 * @param unknownFormat if you can't know line endings, column delimiters and quotation characters set unknownFormat=true
 *                      for automatic detection
 */
public UltraCSVRecordReader(boolean unknownFormat) {
    this();
    if (unknownFormat) {
        settings = new CsvParserSettings();
        settings.detectFormatAutomatically();
        csvParser = new CsvParser(settings);
    }
}

public UltraCSVRecordReader(CsvParserSettings settings) {
    this.settings = settings;
    csvParser = new CsvParser(settings);
}

/**
 * @param skipNumLines              number of lines to skip
 * @param delimiter                 (default ,): value used to separate individual fields in the input
 * @param quote                     (default "): value used for escaping values where the fields delimiter is part of
 *                                  the value (e.g. the value "a,b" is parse as a , b).
 * @param quoteEscape               (default "): value used for escaping the quote character inside an already escaped value
 *                                  (e.g. the value " "" a,b "" " is parse as " a , b ").
 * @param charToEscapeQuoteEscaping (default \0): value used for escaping the quote escape character, when quote and quote escape are different
 *                                  (e.g. the value “\ " a , b " \” is parsed as \ " a , b " \, if quote = ", quoteEscape = \ and charToEscapeQuoteEscaping = \).
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape,
                            char charToEscapeQuoteEscaping) {
    settings = new CsvParserSettings();
    settings.getFormat().setDelimiter(delimiter);
    settings.getFormat().setQuote(quote);
    settings.getFormat().setQuoteEscape(quoteEscape);
    settings.getFormat().setCharToEscapeQuoteEscaping(charToEscapeQuoteEscaping);
    settings.setNumberOfRowsToSkip(skipNumLines);
    csvParser = new CsvParser(settings);
}

/**
 * @param skipNumLines number of lines to skip
 */
public UltraCSVRecordReader(long skipNumLines) {
    this(skipNumLines, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
 * @param skipNumLines number of lines to skip
 * @param delimiter    (default ,): value used to separate individual fields in the input
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter) {
    this(skipNumLines, delimiter, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
 * @param skipNumLines number of lines to skip
 * @param delimiter    (default ,): value used to separate individual fields in the input
 * @param quote        (default "): value used for escaping values where the fields delimiter is part of
 *                     the value (e.g. the value "a,b" is parse as a , b).
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote) {
    this(skipNumLines, delimiter, quote, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
 * @param skipNumLines number of lines to skip
 * @param delimiter    (default ,): value used to separate individual fields in the input
 * @param quote        (default "): value used for escaping values where the fields delimiter is part of
 *                     the value (e.g. the value "a,b" is parse as a , b).
 * @param quoteEscape  (default "): value used for escaping the quote character inside an already escaped value
 *                     (e.g. the value " "" a,b "" " is parse as " a , b ").
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape) {
    this(skipNumLines, delimiter, quote, quoteEscape, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

@Override
public void initialize(InputSplit split) throws IOException, InterruptedException {
    super.initialize(split);
    this.initialize(((FileSplit) split).getRootDir());
}

public UltraCSVRecordReader maxLengthCharactersToParser(int numberCharacters) {
    this.settings.setMaxCharsPerColumn(numberCharacters);
    this.csvParser = new CsvParser(this.settings);
    return this;
}

public void initialize(File file) {
    IterableResult<String[], ParsingContext> iterate = this.csvParser.iterate(file);
    iterator = iterate.iterator();
}

protected List<Writable> parseLine(String line) {
    String[] split;
    split = this.csvParser.parseLine(line);
    List<Writable> values = new ArrayList<>();
    for (String value : split) {
        values.add(new Text(value));
    }
    return values;
}

public List<List<Writable>> next(int num) {
    List<List<Writable>> ret = new ArrayList<>(Math.min(num, 10000));
    int count = 0;

    while (this.hasNext() && count++ < num) {
        ret.add(this.next());
    }
    return ret;
}

public List<Writable> next() {
    String[] valuesSplit = iterator.next();
    List<Writable> values = new ArrayList<>();
    try {
        for (String value : valuesSplit) {
            values.add(new Text(value));
        }
    } catch (NullPointerException ex) {
        ex.printStackTrace();
        System.out.println(values);
        System.out.println("================================");
        System.out.println(Arrays.toString(valuesSplit));
    }

    return values;
}

public boolean batchesSupported() {
    return true;
}

public boolean hasNext() {
    return iterator.hasNext();
}

public Record nextRecord() {
    List<Writable> next = this.next();
    URI uri = this.locations != null && this.locations.length >= 1 ? this.locations[this.splitIndex] : null;
    RecordMetaData meta = new RecordMetaDataLine(this.lineIndex - 1, uri, UltraCSVRecordReader.class);
    return new org.datavec.api.records.impl.Record(next, meta);
}

public Record loadFromMetaData(RecordMetaData recordMetaData) throws IOException {
    return this.loadFromMetaData(Collections.singletonList(recordMetaData)).get(0);
}

public List<Record> loadFromMetaData(List<RecordMetaData> recordMetaDatas) throws IOException {
    List<Record> list = super.loadFromMetaData(recordMetaDatas);

    for (Record r : list) {
        String line = r.getRecord().get(0).toString();
        r.setRecord(this.parseLine(line));
    }

    return list;
}

public void reset() {
    super.reset();
}

public CsvParser getCsvParser() {
    return csvParser;
}
}

示例数据集

为什么要花两年时间才能摧毁它们?我们又来了……另一组从政府和纳税人那里偷东西!一群索马里人在短短 10 个月内偷走了超过 400 万份政府福利!我们已经报道了许多类似这样的案例,其中穆斯林难民/移民通过欺骗我们的系统进行欺诈......这是失控的!更多相关","english","2016-10-27T01:49:27.168+03:00","100percentfedup.com","US",25689,"Muslims BUSTED: 他们偷走了数百万的政府福利", 0"http://bb4sp.com/wp-content/uploads/2016/10/Fullscreen-capture-10262016-83501-AM.bmp.jpg ",0,1,0,0,0,"bias"

这是我的改造过程

package cu.desoft.cav.preprocessing;

import cu.desoft.cav.RecordReader.UltraCSVRecordReader;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import org.datavec.api.transform.TransformProcess;
import org.datavec.api.transform.schema.Schema;
import org.datavec.api.writable.Writable;
import org.datavec.spark.transform.SparkTransformExecutor;
import org.datavec.spark.transform.misc.StringToWritablesFunction;
import org.datavec.spark.transform.misc.WritablesToStringFunction;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
 * author: acosta
 * email: yunielacosta738@gmail.com
 * Created on: 2/3/20
 */
public class FakeNewsTransformation {
    private final String DATSETS_PATH = "data/FakeNews/";

    public void transform(boolean useSparkLocal) {
        Schema schema = new Schema.Builder()
                .addColumnString("uuid")
                .addColumnInteger("ord_in_thread")
                .addColumnString("author")
                .addColumnString("published")
                .addColumnsString("title","text","language","crawled","site_url","country")
                .addColumnInteger("domain_rank")
                .addColumnString("thread_title")
                .addColumnsInteger("spam_score","main_img_url","replies_count","participants_count","likes","comments","shares")
                .addColumnCategorical("type", Arrays.asList("bias", "bs","conspiracy","fake","hate","junksci","satire","state"))
                .build();

        TransformProcess tp = new TransformProcess.Builder(schema)
                .removeColumns("uuid", "ord_in_thread","author","published","site_url","country","thread_title")
                .categoricalToInteger("type")
                .build();

        int numActions = tp.getActionList().size();
        for (int i = 0; i < numActions; i++) {
            System.out.println("\n\n===============================");
            System.out.println("--- Schema after step " + i +
                    " (" + tp.getActionList().get(i) + ")--");
            System.out.println(tp.getSchemaAfterStep(i));
        }

        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator");
        if (useSparkLocal) {
            sparkConf.setMaster("local[*]");
        }

        sparkConf.setAppName("Fake News Spanish Corpus dataset transformation");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //Load our data using Spark
        JavaRDD<String> lines = sc.textFile(DATSETS_PATH + "fake.csv");
        int skipNumLines = 1;
        //We first need to parse this format. It's comma-delimited (CSV) format, so let's parse it using CSVRecordReader:
        RecordReader rr = new UltraCSVRecordReader();
//        RecordReader rr = new CSVRecordReader();
        JavaRDD<List<Writable>> parsedInputData = lines.map(new StringToWritablesFunction(rr));

        //Now, let's execute the transforms we defined earlier:
        JavaRDD<List<Writable>> processedData = SparkTransformExecutor.execute(parsedInputData, tp);

        //For the sake of this example, let's collect the data locally and print it:
        JavaRDD<String> processedAsString = processedData.map(new WritablesToStringFunction(","));
        System.out.println("<<<<<<<<<<<<<<<PATH>>>>>>>>>>>>>");
        File dataset = new File("dataset/FakeNews");
        if (dataset.exists()) {
            try {
                FileUtils.deleteDirectory(dataset);
                System.out.println("DELETE THE DIRECTORY");
            } catch (IOException e) {
                System.out.println("The directory was not delete");
                e.printStackTrace();
            }
        }
        System.out.println(dataset.getAbsolutePath());
        System.out.println("<<<<<<<<<<<<<<<END-PATH>>>>>>>>>>>>>");
        processedAsString.saveAsTextFile("file://" + dataset.getAbsolutePath());   //To save locally
        //processedAsString.saveAsTextFile("hdfs://your/hdfs/save/path/here");   //To save to hdfs

        List<String> processedCollected = processedAsString.collect();
        List<String> inputDataCollected = lines.collect();


    }

    public static void main(String[] args) {
        new FakeNewsTransformation().transform(true);
    }
}

这是我使用 CSVRecordReader (DataVec) 时的输出错误

    java.lang.RuntimeException: java.io.IOException: Un-terminated quoted field at end of CSV line
    at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:183)
    at org.datavec.api.records.reader.impl.csv.CSVRecordReader.next(CSVRecordReader.java:175)
    at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:41)
    at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:33)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1211)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1218)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Un-terminated quoted field at end of CSV line
    at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:276)
    at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:186)
    at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:181)
    ... 21 more

这就是当我使用我自己的 CSVRecordReader 和 univocity csv 解析器时的序列化问题(这个库不实现可序列化)

  Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.map(RDD.scala:369)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
    at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
    at cu.desoft.cav.preprocessing.FakeNewsTransformation.transform(FakeNewsTransformation.java:71)
    at cu.desoft.cav.preprocessing.FakeNewsTransformation.main(FakeNewsTransformation.java:101)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.io.NotSerializableException: com.univocity.parsers.csv.CsvParser
Serialization stack:
    - object not serializable (class: com.univocity.parsers.csv.CsvParser, value: com.univocity.parsers.csv.CsvParser@75b6dd5b)
    - field (class: cu.desoft.cav.RecordReader.UltraCSVRecordReader, name: csvParser, type: class com.univocity.parsers.csv.CsvParser)
    - object (class cu.desoft.cav.RecordReader.UltraCSVRecordReader, cu.desoft.cav.RecordReader.UltraCSVRecordReader@1fedf0a4)
    - field (class: org.datavec.spark.transform.misc.StringToWritablesFunction, name: recordReader, type: interface org.datavec.api.records.reader.RecordReader)
    - object (class org.datavec.spark.transform.misc.StringToWritablesFunction, org.datavec.spark.transform.misc.StringToWritablesFunction@465b38e6)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 18 more
4

1 回答 1

0

您要么必须修复 CSV 中导致错误的行。或者将您的记录阅读器实现为可序列化。
基于, univocity-parsers “处理未转义的引号,您可以将其配置为在发现异常时引发异常。” 也许试试?

于 2020-02-04T18:33:47.940 回答