3

我正在尝试使用spark_read_csv函数将 csv 读入 Spark。我在推断架构时遇到异常,即我在设置时遇到异常infer_schema=TRUE

spark_read_csv(sc,"myDf",DatasetUrl)

我收到以下异常:

错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 90.0 中的任务 0 失败 1 次,最近一次失败:阶段 90.0 中丢失任务 0.0(TID 151,本地主机):java.text.ParseException:无法解析编号:“cr1_fd_dttm”在 java.text.NumberFormat.parse(NumberFormat.java:385) 在 org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema .scala:259)

但是,当我尝试设置时infer_schema=FALSE,正如预期的那样,所有内容都被读取为chr类型。

这是列中数据的样子cr1_fd_dttm

      cr1_fd_dttm
            <chr>
1             0.0
2   1.45679112E12
3   1.45679166E12
4   1.45679154E12
5   1.45679274E12
6             0.0
7             0.0
8             0.0
9             0.0
10  1.45679118E12

有人可以帮助我吗?

谢谢

4

1 回答 1

1

我只是读取文件而不立即将其放入内存,强制字段为数字,然后将这些结果加载到内存中。所以关键是设置memory为 FALSE,设置为infer_schemaFALSE,传递列列表,强制然后使用compute()将结果保存到 Spark 内存中。这是一个冗长但有效的示例:

mapped_flights <- spark_read_csv(sc, "mapped_flights", 
                      path =  "s3a://flights-data/full", 
                      memory = FALSE, 
                      infer_schema = FALSE,
                      columns = list(
                        Year = "character",
                        Month = "character",
                        DayofMonth = "character",
                        DayOfWeek = "character",
                        DepTime = "character",
                        CRSDepTime = "character",
                        ArrTime = "character",
                        CRSArrTime = "character",
                        UniqueCarrier = "character",
                        FlightNum = "character",
                        TailNum = "character",
                        ActualElapsedTime = "character",
                        CRSElapsedTime = "character",
                        AirTime = "character",
                        ArrDelay = "character",
                        DepDelay = "character",
                        Origin = "character",
                        Dest = "character",
                        Distance = "character",
                        TaxiIn = "character",
                        TaxiOut = "character",
                        Cancelled = "character",
                        CancellationCode = "character",
                        Diverted = "character",
                        CarrierDelay = "character",
                        WeatherDelay = "character",
                        NASDelay = "character",
                        SecurityDelay = "character",
                        LateAircraftDelay = "character")
                      )


flights <- mapped_flights %>%   mutate(
Year = as.integer(Year),
Month = as.integer(Month),
DayofMonth = as.integer(DayofMonth),
DayOfWeek = as.integer(DayOfWeek),
DepTime = as.integer(DepTime),
CRSDepTime = as.integer(CRSDepTime),
CRSArrTime = as.integer(CRSArrTime),
ArrTime = as.integer(ArrTime),
ActualElapsedTime = as.integer(ActualElapsedTime),
CRSElapsedTime = as.integer(CRSElapsedTime),
AirTime = as.integer(AirTime),
ArrDelay = as.double(ArrDelay),
DepDelay = as.double(DepDelay),
Distance = as.integer(Distance),
TaxiIn = as.integer(TaxiIn),
TaxiOut = as.integer(TaxiOut),
Cancelled = as.integer(Cancelled),
Diverted = as.integer(Diverted),
CarrierDelay = as.integer(CarrierDelay),
WeatherDelay = as.integer(WeatherDelay),
NASDelay = as.integer(NASDelay),
SecurityDelay = as.integer(SecurityDelay),
LateAircraftDelay = as.integer(LateAircraftDelay)) %>%   compute("flights")
于 2017-09-21T22:23:24.560 回答