我希望将一些压缩的 csv 文件消耗到 DataFrames 中,以便最终可以使用 SparkSQL 查询它们。我通常只会使用 sc.textFile() 来使用文件并使用各种 map() 转换来解析和转换数据,但是有问题的文件有一些难以解析的值。特别是,有引号封装的值在其中包含逗号,这破坏了在 map() 转换中使用 split() 函数的选项。
这就是我正在做的事情:
我使用 spark-csv 和 commons-csv jar 启动 spark
PYSPARK_PYTHON=python2.7 sudo pyspark --jars "spark-csv_2.10-1.0.0.jar,commons-csv-1.1.jar"
我创建了一个模式变量,因为我的 csv 没有标题,然后进行以下调用
sqlc = SQLContext(sc)
apps_df = sqlc.read.format("com.databricks.spark.csv").options(header="false",codec="org.apache.hadoop.io.compress.GzipCodec").load("s3://path_to_file.csv.gz", schema = customSchema)
当您使用apps_df.printSchema() 时,这确实会返回一个具有正确架构的DataFrame 对象,但apps_df.count() 返回0 而apps_df.first() 什么也不返回。
编辑:
这是我的,希望是可重复的例子
将full_filepath替换为目录中的 .csv 文件
将full_gzip_filepath替换为目录中 csv 文件的 .gz 版本
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlc = SQLContext(sc)
import pandas as pd
import numpy as np
from subprocess import check_call
columns = ['A','B', 'C']
data = np.array([np.arange(10)]*3).T
df = pd.DataFrame(data, columns=columns)
df.to_csv('full_filepath')
check_call(['gzip', 'full_filepath'])
test_scsv_df = sqlc.read.format("com.databricks.spark.csv").options(header="true",inferSchema="true",codec="org.apache.hadoop.io.compress.GzipCodec").load("full_gzip_filepath")
test_scsv_df.show()
这将返回:
+---+---+---+---+
| | A| B| C|
+---+---+---+---+
+---+---+---+---+
如果您还运行接下来的几个命令,您将看到该文件可以通过 pandas 正确使用
test_pd = pd.read_csv('full_gzip_filepath', sep=',', compression='gzip', quotechar='"', header=0)
test_pd_df = sqlc.createDataFrame(test_pd)
test_pd_df.show()
这将返回:
+----------+---+---+---+
|Unnamed: 0| A| B| C|
+----------+---+---+---+
| 0| 0| 0| 0|
| 1| 1| 1| 1|
| 2| 2| 2| 2|
| 3| 3| 3| 3|
| 4| 4| 4| 4|
| 5| 5| 5| 5|
| 6| 6| 6| 6|
| 7| 7| 7| 7|
| 8| 8| 8| 8|
| 9| 9| 9| 9|
+----------+---+---+---+