因为我不能使用 spark csv,所以我从 CSV 手动创建了一个数据框,如下所示:
raw_data=sc.textFile("data/ALS.csv").cache()
csv_data=raw_data.map(lambda l:l.split(","))
header=csv_data.first()
csv_data=csv_data.filter(lambda line:line !=header)
row_data=csv_data.map(lambda p :Row (
location_history_id=p[0],
user_id=p[1],
latitude=p[2],
longitude=p[3],
address=p[4],
created_at=p[5],
valid_until=p[6],
timezone_offset_secs=p[7],
opening_times_id=p[8],
timezone_id=p[9]))
location_df = sqlContext.createDataFrame(row_data)
location_df.registerTempTable("locations")
我只需要两列:
lati_longi_df=sqlContext.sql("""SELECT latitude, longitude FROM locations""")
rdd_lati_longi = lati_longi_df.map(lambda data: Vectors.dense([float(c) for c in data]))
rdd_lati_longi.take(2):
[DenseVector([-6.2416, 106.7949]), DenseVector([-6.2443, 106.7956])]
现在似乎一切都准备好进行 KMeans 训练了:
clusters = KMeans.train(rdd_lati_longi, 10, maxIterations=30,
runs=10, initializationMode="random")
但我收到以下错误:
IndexError: list index out of range
ALS.csv 的前三行:location_history_id,user_id,latitude,longitude,address,created_at,valid_until,timezone_offset_secs,opening_times_id,timezone_id