问题标签 [spark-csv]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
11287 浏览

scala - 如何保存引用所有字段的 CSV?

下面的代码不添加默认的双引号。我还尝试使用选项添加 # 和单引号quote,但没有成功。我还使用quoteMode了 withALLNON_NUMERIC选项,输出仍然没有变化。

我还有其他选择吗?我在 spark 2.1 上使用 spark-csv 2.11。

它产生的输出:

我正在寻找的输出:

0 投票
2 回答
7245 浏览

csv - Spark 2.1 无法在 CSV 上写入 Vector 字段

当我偶然发现与数据帧保存相关的问题时,我正在将我的代码从 Spark 2.0 迁移到 2.1。

这是代码

此代码在使用 Spark 2.0.0 时成功

使用 Spark 2.1.0.cloudera1,我收到以下错误:

这只是在我身边吗?

这与 Spark 2.1 的 cloudera 版本有关吗?(从他们的仓库来看,他们似乎没有弄乱 spark.sql 所以也许不是)

谢谢 !

0 投票
1 回答
1408 浏览

apache-spark - 使用 spark 读取文件时出错

我在 apache spark 中读取本地文件时出错。scala> val f=sc.textFile("/home/cloudera/Downloads/sample.txt")

scala> f.count()

org.apache.hadoop.mapred.InvalidInputException: 输入路径不存在: hdfs://quickstart.cloudera:8020/home/cloudera/Downloads/sample.txt at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat. java:287) 在 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229) 在 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315) 在 org.apache.spark.rdd .HadoopRDD.getPartitions(HadoopRDD.scala:202) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$ partitions$2.apply(RDD.scala:237) 在 org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 在 org.apache.spark 的 scala.Option.getOrElse(Option.scala:120)。 rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 在 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 在 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 在 scala。 Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1959) at org.apache .spark.rdd.RDD.count(RDD.scala:1157) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30) at $iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC.(:35) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37) at $iwC$$iwC$ $iwC$$iwC$$iwC.(:39) at $iwC$$iwC$$iwC$$iwC.(:41) at $iwC$$iwC$$iwC.(:43) at $iwC$$iwC .(:45) at $iwC.(:47) at (:49) at .(:53) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl。在 sun.reflect.NativeMethodAccessorImpl 处调用0(本机方法)。invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain $ReadEvalPrint.call(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:第821章.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org .apache.spark.repl。SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$ SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun $org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 在 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 在 org.apache.spark。 repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 在 org.apache.spark.repl.SparkILoop。进程(SparkILoop.scala:1064)在 org.apache.spark.repl.Main$.main(Main.scala:35) 在 org.apache.spark.repl.Main.main(Main.scala) 在 sun.reflect。 NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method. java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1( SparkSubmit.scala:181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache .spark.deploy.SparkSubmit.main(SparkSubmit.scala)第1064章) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org .apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit .main(SparkSubmit.scala)第1064章) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org .apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit .main(SparkSubmit.scala)main(Main.scala:35) 在 org.apache.spark.repl.Main.main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$ spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit( SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)main(Main.scala:35) 在 org.apache.spark.repl.Main.main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$ spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit( SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke( Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain $1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org .apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke( Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain $1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org .apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ runMain(SparkSubmit.scala:730) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ runMain(SparkSubmit.scala:730) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy .SparkSubmit.main(SparkSubmit.scala)181) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 在 org.apache.spark.deploy .SparkSubmit.main(SparkSubmit.scala)

0 投票
3 回答
8805 浏览

scala - Spark CSV package not able to handle \n within fields

I have a CSV file which I am trying to load using Spark CSV package and it does not load data properly because few of the fields have \n within them for e.g. the following two rows

I am using the following code which is straightforward I am using parserLib as univocity as read in internet it solves multiple newline problem but it does not seems to be the case for me.

How do I replace newline within fields which starts with quotes. Is there any easier way?

0 投票
7 回答
27640 浏览

apache-spark - 如何使用 spark-csv 包在 HDFS 上仅读取 n 行大型 CSV 文件?

我在 HDFS 上有一个大的分布式文件,每次我使用带有 spark-csv 包的 sqlContext 时,它首先加载整个文件,这需要相当长的时间。

现在我有时只想快速检查一下,我只需要整个文件中的几行/任意 n 行。

但所有这些都在文件加载完成后运行。我不能在读取文件本身时限制行数吗?我指的是 spark-csv 中与 pandas 等效的 n_rows,例如:

或者可能是spark实际上并没有加载文件,第一步,但在这种情况下,为什么我的文件加载步骤花费了太多时间呢?

我想

只给我n而不是所有行,有可能吗?

0 投票
1 回答
322 浏览

apache-spark - 计数抛出 java.lang.NumberFormatException:从启用了 inferSchema 的对象存储加载的文件上为 null

当启用 inferSchema 时,从 IBM Blue mix 对象存储加载的数据帧上的 count() 会引发以下异常:

如果禁用 inferSchema,则不会出现上述异常。为什么我会收到此异常?默认情况下,如果启用了 inferSchema,databricks 会读取多少行?

0 投票
1 回答
923 浏览

scala - 如何动态定义流数据集的模式以写入 csv?

我有一个流数据集,从 kafka 读取并尝试写入 CSV

Event保存Map[String,String]在里面并写入 CSV 我需要一些架构。

假设所有字段都是类型String,所以我尝试了spark repo中的示例

这会在“eventDataset.rdd”行的运行时产生错误:

引起:org.apache.spark.sql.AnalysisException:带有流源的查询必须用writeStream.start();;

下面不起作用,因为 '.map' 有一个 List[String] 而不是 Tuple

有没有办法通过编程模式和结构化流数据集来实现这一点?

0 投票
3 回答
3292 浏览

apache-spark - Spark 不读取第一行中具有空值的列

以下是我的 csv 文件中的内容:

因此,第一行有 5 列,但只有 3 个值。

我使用以下命令阅读它:

以下是我使用 csvDF.show() 得到的结果

如何读取所有列中的所有数据?

0 投票
2 回答
6891 浏览

apache-spark - 如何强制 CSV 的 inferSchema 将整数视为日期(使用“dateFormat”选项)?

我使用 Spark 2.2.0

我正在读取一个 csv 文件,如下所示:

此文件中有一个日期列,并且所有记录的值都等于20171001该特定列的值。

问题是 spark 推断该列的类型是integer而不是date. 当我删除该"inferSchema"选项时,该列的类型是string.

此文件中没有null值,也没有任何格式错误的行。

这个问题的原因/解决方案是什么?

0 投票
1 回答
2557 浏览

csv - 如何在 spark 数据帧 csv 输出和 UTF-8-BOM 编码中添加特殊字符分隔符

我必须使用 "|^|" Delimiter 将我的 spark 数据帧输出写入 csv 文件。我正在尝试这样做。

但低于错误

java.lang.IllegalArgumentException:不支持的分隔符特殊字符:\|\^\|

如何添加 UTF-8-BOM 编码的输出文件默认为 UTF-8?

在 java 中,如果我在文件的开头添加“\uFEFF”,文件的编码将更改为 UTF-8-BOM。现在我如何在 spark csv 中附加这个字符?

另外,如果我采用以下方法df.rdd.map(x=>x.mkString("|^|")).saveAsTextFile("dir path to store")

我将如何根据数据框中的列对数据进行分区?