0

我是 PySpark 的新手,我想将预处理(包括 Python 的编码和规范化部分脚本)转换为 PySpark 以用于合成数据。(A 列和 C 列是分类的)首先,我有所谓的 Spark 数据框,sdf 包括 5 列:

下面是示例:

#+----------+-----+---+-------+----+
#|A         |B    |C  |D      |E   |
#+----------+-----+---+-------+----+
#|Sentence  |92   |6  |False  |49  |
#|Sentence  |17   |3  |False  |15  |
#|Sentence  |17   |3  |False  |15  |
#|-         |0    |0  |False  |0   |
#|-         |0    |0  |False  |0   |
#|-         |0    |0  |False  |0   |
#+----------+-----+---+-------+----+

现在我想分配除其他功能之外的统计频率,并将结果与sdf​​ . 到目前为止,我可以使用 pythonic 脚本来做到这一点:

#import libs
import copy
import numpy as np
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import scale
from sklearn import preprocessing

#Statistical Preprocessing
def add_freq_to_features(df):
  frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
  frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
  new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
  
  return new_df

# Encode and Normalize
def normalize_features(df):
  temp_df = df.copy()
  
  le = preprocessing.LabelEncoder()
  #le.fit(temp_df)
    
  temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)
  
  for column in ["A", "B", "C", "D", "E"]:
    #-1: all rows selected into 1 
    # reshape(1, -1) select one row contains all columns/features
    temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1)) 
    
  return temp_df

# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(features_df)

#Apply Encoding and Normalizing function
normalized_features_df = normalize_features(features_df)


to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]
normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)
#normalized_features_df

问题:在将 Spark 数据帧转换为 Pandas 数据帧toPandas()以优化管道并以 100% 火花形式处理它的情况下,翻译 Preprocessing 的最佳方法是什么?

预期的输出以 Spark 数据框的形式显示如下:

#+----------+-----+---+-------+----+----------+
#|A         |B    |C  |D      |E   |Freq      |
#+----------+-----+---+-------+----+----------+
#|Sentence  |92   |6  |False  |49  |0.166667  |
#|Sentence  |17   |3  |False  |15  |0.333333  |
#|Sentence  |17   |3  |False  |15  |0.333333  |
#|-         |0    |0  |False  |0   |0.500000  |
#|-         |0    |0  |False  |0   |0.500000  |
#|-         |0    |0  |False  |0   |0.500000  |
#+----------+-----+---+-------+----+----------+
4

1 回答 1

0

Spark 有 Spark MLlib 包,专为特征工程和机器学习目的而设计。话虽如此,您不应该像使用 Pandas 那样手动构建功能。归根结底,您仍然必须使用 Spark 来构建模型,那么为什么不开始正确使用 Spark ML 呢?我强烈建议通读几个部分,例如构建特性构建管道分类/回归和其他一些算法。

回到你原来的问题,这是你的示例代码的 Spark 版本(我也在你的笔记本中运行了它,稍加改动以适应你的变量。)

# this is to build "raw" Freq
sdf2 = (sdf
    .groupBy(sdf.columns)
    .agg(F.count('*').alias('Freq'))
    .withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()

# this is to normalize features using MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

type_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')
encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')
assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])

# Compute summary statistics and generate model
model = pipeline.fit(sdf2)

# rescale each feature to range [min, max].
model.transform(sdf2).show(10, False)

# Output
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |Type  |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features                 |scaled_features          |
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
# |String|8     |0           |true         |7                |1   |0.0     |0.0              |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|
# |String|0     |0           |true         |0                |1   |0.0     |0.0              |(6,[5],[1.0])            |[0.5,0.0,0.5,0.5,0.0,0.5]|
# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
于 2021-09-19T04:54:04.487 回答