3

我希望将一些压缩的 csv 文件消耗到 DataFrames 中,以便最终可以使用 SparkSQL 查询它们。我通常只会使用 sc.textFile() 来使用文件并使用各种 map() 转换来解析和转换数据,但是有问题的文件有一些难以解析的值。特别是,有引号封装的值在其中包含逗号,这破坏了在 map() 转换中使用 split() 函数的选项。

这就是我正在做的事情:

我使用 spark-csv 和 commons-csv jar 启动 spark

PYSPARK_PYTHON=python2.7 sudo pyspark --jars "spark-csv_2.10-1.0.0.jar,commons-csv-1.1.jar"

我创建了一个模式变量,因为我的 csv 没有标题,然后进行以下调用

sqlc = SQLContext(sc)
apps_df = sqlc.read.format("com.databricks.spark.csv").options(header="false",codec="org.apache.hadoop.io.compress.GzipCodec").load("s3://path_to_file.csv.gz", schema = customSchema)

当您使用apps_df.printSchema() 时,这确实会返回一个具有正确架构的DataFrame 对象,但apps_df.count() 返回0 而apps_df.first() 什么也不返回。

编辑:

这是我的,希望是可重复的例子

full_filepath替换为目录中的 .csv 文件

full_gzip_filepath替换为目录中 csv 文件的 .gz 版本

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlc = SQLContext(sc)
import pandas as pd
import numpy as np
from subprocess import check_call

columns = ['A','B', 'C']
data = np.array([np.arange(10)]*3).T

df = pd.DataFrame(data, columns=columns)

df.to_csv('full_filepath')

check_call(['gzip', 'full_filepath'])

test_scsv_df = sqlc.read.format("com.databricks.spark.csv").options(header="true",inferSchema="true",codec="org.apache.hadoop.io.compress.GzipCodec").load("full_gzip_filepath")

test_scsv_df.show()

这将返回:

+---+---+---+---+
|   |  A|  B|  C|
+---+---+---+---+
+---+---+---+---+

如果您还运行接下来的几个命令,您将看到该文件可以通过 pandas 正确使用

test_pd = pd.read_csv('full_gzip_filepath', sep=',', compression='gzip', quotechar='"', header=0)

test_pd_df = sqlc.createDataFrame(test_pd)

test_pd_df.show()

这将返回:

+----------+---+---+---+
|Unnamed: 0|  A|  B|  C|
+----------+---+---+---+
|         0|  0|  0|  0|
|         1|  1|  1|  1|
|         2|  2|  2|  2|
|         3|  3|  3|  3|
|         4|  4|  4|  4|
|         5|  5|  5|  5|
|         6|  6|  6|  6|
|         7|  7|  7|  7|
|         8|  8|  8|  8|
|         9|  9|  9|  9|
+----------+---+---+---+
4

0 回答 0