你可以通过使用 VectorAssembler 来做到这一点。他们的关键是您必须从汇编器输出中提取列。请参阅下面的代码以获取工作示例,
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
import random
df = pd.DataFrame()
df['a'] = random.sample(range(100), 10)
df['b'] = random.sample(range(100), 10)
df['c'] = random.sample(range(100), 10)
df['d'] = random.sample(range(100), 10)
df['e'] = random.sample(range(100), 10)
sdf = sc.createDataFrame(df)
sdf.show()
+---+---+---+---+---+
| a| b| c| d| e|
+---+---+---+---+---+
| 51| 13| 6| 5| 26|
| 18| 29| 19| 81| 28|
| 34| 1| 36| 57| 87|
| 56| 86| 51| 52| 48|
| 36| 49| 33| 15| 54|
| 87| 53| 47| 89| 85|
| 7| 14| 55| 13| 98|
| 70| 50| 32| 39| 58|
| 80| 20| 25| 54| 37|
| 40| 33| 44| 83| 27|
+---+---+---+---+---+
cols_to_scale = ['c', 'd', 'e']
cols_to_keep_unscaled = ['a', 'b']
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
assembler = VectorAssembler().setInputCols(cols_to_scale).setOutputCol("features")
sdf_transformed = assembler.transform(sdf)
scaler_model = scaler.fit(sdf_transformed.select("features"))
sdf_scaled = scaler_model.transform(sdf_transformed)
sdf_scaled.show()
+---+---+---+---+---+----------------+--------------------+
| a| b| c| d| e| features| scaledFeatures|
+---+---+---+---+---+----------------+--------------------+
| 51| 13| 6| 5| 26| [6.0,5.0,26.0]|[0.39358015146628...|
| 18| 29| 19| 81| 28|[19.0,81.0,28.0]|[1.24633714630991...|
| 34| 1| 36| 57| 87|[36.0,57.0,87.0]|[2.36148090879773...|
| 56| 86| 51| 52| 48|[51.0,52.0,48.0]|[3.34543128746345...|
| 36| 49| 33| 15| 54|[33.0,15.0,54.0]|[2.16469083306459...|
| 87| 53| 47| 89| 85|[47.0,89.0,85.0]|[3.08304451981926...|
| 7| 14| 55| 13| 98|[55.0,13.0,98.0]|[3.60781805510765...|
| 70| 50| 32| 39| 58|[32.0,39.0,58.0]|[2.09909414115354...|
| 80| 20| 25| 54| 37|[25.0,54.0,37.0]|[1.63991729777620...|
| 40| 33| 44| 83| 27|[44.0,83.0,27.0]|[2.88625444408612...|
+---+---+---+---+---+----------------+--------------------+
# Function just to convert to help build data frame
def extract(row):
return (row.a, row.b,) + tuple(row.scaledFeatures.toArray().tolist())
sdf_scaled = sdf_scaled.select(*cols_to_keep_unscaled, "scaledFeatures").rdd \
.map(extract).toDF(cols_to_keep_unscaled + cols_to_scale)
sdf_scaled.show()
+---+---+------------------+-------------------+------------------+
| a| b| c| d| e|
+---+---+------------------+-------------------+------------------+
| 51| 13|0.3935801514662892|0.16399957083190683|0.9667572801316145|
| 18| 29| 1.246337146309916| 2.656793047476891|1.0411232247571234|
| 34| 1|2.3614809087977355| 1.8695951074837378|3.2349185912096337|
| 56| 86|3.3454312874634584| 1.7055955366518312|1.7847826710122114|
| 36| 49| 2.164690833064591|0.49199871249572047| 2.007880504888738|
| 87| 53| 3.083044519819266| 2.9191923608079415|3.1605526465841245|
| 7| 14|3.6078180551076513| 0.4263988841629578| 3.643931286649932|
| 70| 50|2.0990941411535426| 1.2791966524888734|2.1566123941397555|
| 80| 20| 1.639917297776205| 1.7711953649845937| 1.375769975571913|
| 40| 33|2.8862544440861213| 2.7223928758096534| 1.003940252444369|
+---+---+------------------+-------------------+------------------+