2

I have a CSV file which I am trying to load using Spark CSV package and it does not load data properly because few of the fields have \n within them for e.g. the following two rows

"XYZ", "Test Data", "TestNew\nline", "OtherData" 
"XYZ", "Test Data", "blablablabla
\nblablablablablalbal", "OtherData" 

I am using the following code which is straightforward I am using parserLib as univocity as read in internet it solves multiple newline problem but it does not seems to be the case for me.

 SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read()
        .format("com.databricks.spark.csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .option("parserLib","univocity")
        .load("data.csv");

How do I replace newline within fields which starts with quotes. Is there any easier way?

4

3 回答 3

5

根据SPARK-14194(已解决为重复)带有换行符的字段不受支持,并且永远不会支持。

我建议通过wholeFile选项解决这个问题,它似乎合并了。我正在解决这个问题,因为那个人有一个 PR。

然而,那是 Spark 2.0,你使用spark-csv模块。

在引用的SPARK-19610中,它通过拉取请求修复:

嗯,我理解这样做的动机,尽管我对 csv 的理解通常避免在字段中使用换行符,或者某些实现需要使用换行符围绕字段值进行引号

换句话说,使用wholeFileSpark 2.x 中的选项(如您在CSVDataSource中所见)。

至于 spark-csv,此评论可能会有所帮助(突出显示我的):

但是,有很多类似的 JIRA 抱怨这一点,原始的 CSV 数据源试图支持这一点,尽管实现不正确。这至少尝试将其与 JSON 匹配,并且提供一种处理此类 CSV 文件的方法可能会更好。实际上,当前的实现需要引号:)。(据说 R 实际上也支持这种情况)。

在 spark-csv 的功能中,您可以找到以下内容:

该包还支持保存简单(非嵌套)DataFrame。写入文件时,API 接受几个选项:

  • quote:默认情况下,引号字符是",但可以设置为任何字符。这是根据quoteMode.

  • quoteMode : 何时引用字段(ALL、MINIMAL(默认)、NON_NUMERIC、NONE),请参阅引用模式

于 2017-05-30T18:14:48.717 回答
5

Spark 2.2 的用户可以使用一个选项来说明 CSV 文件中的换行符。它最初被讨论为被调用wholeFile,但在发布之前被重命名multiLine

这是使用该选项将 CSV 加载到数据框的示例:

var webtrends_data = (sparkSession.read
.option("header", "true")
.option("inferSchema", "true")
.option("multiLine", true)
.option("delimiter", ",")
.format("csv")
.load("hdfs://hadoop-master:9000/datasource/myfile.csv"))
于 2017-12-05T03:36:06.493 回答
1

升级到 Spark 2.x。换行实际上是由 ascii 13 和 10 表示的 CRLF。但是反斜杠和 'n' 是不同的 ascii,它们以编程方式解释和编写。Spark 2.x 将正确读取.. 我试过了..sb
val conf = new SparkConf().setAppName("HelloSpark").setMaster("local[2]") val sc = SparkSession.builder().master("local").getOrCreate() val df = sc.read.csv("src/main/resources/data.csv") df.foreach(row => println(row.mkString(", ")))
如果您无法升级,则使用正则表达式对 RDD 上的 \n 进行清理。这不会删除行尾,因为它是 $ 正则表达式。锑

  val conf = new SparkConf().setAppName("HelloSpark").setMaster("local")
  val sc = new SparkContext(conf)
  val rdd1 = sc.textFile("src/main/resources/data.csv")
  val rdd2 = rdd1.map(row => row.replace("\\n", ""))
  val sqlContext = new SQLContext(sc)

  import sqlContext.implicits._
  val df = rdd2.toDF()
  df.foreach(row => println(row.mkString(", ")))
于 2017-05-30T18:05:35.107 回答