0

因为我不能使用 spark csv,所以我从 CSV 手动创建了一个数据框,如下所示:

raw_data=sc.textFile("data/ALS.csv").cache()
csv_data=raw_data.map(lambda l:l.split(","))
header=csv_data.first()
csv_data=csv_data.filter(lambda line:line !=header)

row_data=csv_data.map(lambda p :Row (
location_history_id=p[0],
user_id=p[1],
latitude=p[2],
longitude=p[3],
address=p[4],
created_at=p[5],
valid_until=p[6],
timezone_offset_secs=p[7],
opening_times_id=p[8],
timezone_id=p[9]))

location_df = sqlContext.createDataFrame(row_data)
location_df.registerTempTable("locations")

我只需要两列:

lati_longi_df=sqlContext.sql("""SELECT latitude, longitude FROM locations""")

rdd_lati_longi = lati_longi_df.map(lambda data: Vectors.dense([float(c) for c in data]))
rdd_lati_longi.take(2):

[DenseVector([-6.2416, 106.7949]), DenseVector([-6.2443, 106.7956])]

现在似乎一切都准备好进行 KMeans 训练了:

    clusters = KMeans.train(rdd_lati_longi, 10, maxIterations=30,
    runs=10, initializationMode="random")

但我收到以下错误:

IndexError: list index out of range

ALS.csv 的前三行:location_history_id,user_id,latitude,longitude,address,created_at,valid_until,timezone_offset_secs,opening_times_id,timezone_id

4

1 回答 1

0

你为什么不允许 spark 来解析 csv 呢?您可以通过以下方式启用 csv 支持:

pyspark --packages com.databricks:spark-csv_2.10:1.4.0
于 2016-03-24T22:19:51.487 回答