我有一个包含许多列的数据框,它是从定义模式的 csv 文件创建的。我唯一感兴趣的列是名为“Point”的列,我在其中定义了一个 magellan Point(long, lat)。我现在需要做的是从该数据框创建一个 RDD[Point]。
下面是我尝试过的代码,但它不起作用,因为rdd
它是 RDD[Row] 而不是 RDD[Point]。
val schema = StructType(Array(
StructField("vendorId", StringType, false),
StructField("lpep_pickup_datetime", StringType, false),
StructField("Lpep_dropoff_datetime", StringType, false),
StructField("Store_and_fwd_flag",StringType, false),
StructField("RateCodeID", IntegerType, false),
StructField("Pickup_longitude", DoubleType, false),
StructField("Pickup_latitude", DoubleType, false),
StructField("Dropoff_longitude", DoubleType, false),
StructField("Dropoff_latitude", DoubleType, false),
StructField("Passenger_count", IntegerType, false),
StructField("Trip_distance", DoubleType, false),
StructField("Fare_amount", StringType, false),
StructField("Extra", StringType, false),
StructField("MTA_tax", StringType, false),
StructField("Tip_amount", StringType, false),
StructField("Tolls_amount", StringType, false),
StructField("Ehail_fee", StringType, false),
StructField("improvement_surcharge", StringType, false),
StructField("Total_amount", DoubleType, false),
StructField("Payment_type", IntegerType, false),
StructField("Trip_type", IntegerType, false)))
import spark.implicits._
val points = spark.read.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("/home/riccardo/Scrivania/Progetto/Materiale/NYC-taxi/")
.withColumn("point", point($"Pickup_longitude",$"Pickup_latitude"))
.limit(2000)
val rdd = points.select("point").rdd
如何从数据框中获取 RDD[Point] 而不是 RDD[Row]?如果不可能,您会建议哪种解决方案?我需要一个 RDD[Point] 来使用以 RDD[Point] 作为输入的提供的库。