1

I have data in comma separated file, I have loaded it in the spark data frame: The data looks like:

  A B C
  1 2 3
  4 5 6
  7 8 9

I want to transform the above data frame in spark using pyspark as:

   A    B   C
  A_1  B_2  C_3
  A_4  B_5  C_6
  --------------

Then convert it to list of list using pyspark as:

[[ A_1 , B_2 , C_3],[A_4 , B_5 , C_6]]

And then run FP Growth algorithm using pyspark on the above data set.

The code that I have tried is below:

from pyspark.sql.functions import col, size
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark import SQLContext

sqlContext = SQLContext(sc)
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/data.csv")

 names=df.schema.names

Then I thought of doing something inside for loop:

 for name in names:
      -----
      ------

After this I will be using fpgrowth:

df = spark.createDataFrame([
    (0, [ A_1 , B_2 , C_3]),
    (1, [A_4 , B_5 , C_6]),)], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)
4

1 回答 1

2

对于那些使用 Scala 的人来说,这里的一些概念通常展示了如何使用 pyspark。有些不同,但肯定可以学习,尽管有多少是个大问题。我自己当然通过 zipWithIndex 学到了一点关于 pyspark 的知识。反正。

第一部分是将内容转换为所需的格式,可能也可以导入但保持原样:

from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import * 
from pyspark.sql import Row
from pyspark.sql import functions as f

source_df = spark.createDataFrame(
   [
    (1, 11, 111),
    (2, 22, 222)
   ],
   ["colA", "colB", "colC"]
                                 )

intermediate_df = (reduce(
                    lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
                    source_df.columns,
                    source_df
                   )     )

allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))

result_df = result_df.select(split(col("CONCAT_COLS"), ",\s*").alias("ARRAY_COLS"))

# Add 0,1,2,3, ... with zipWithIndex, we add it at back, but that does not matter, you can move it around.
# Get new Structure, the fields (one in this case but done flexibly, plus zipWithIndex value.
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])

# Need this dict approach with pyspark, different to Scala.
rdd = result_df.rdd.zipWithIndex()
rdd1 = rdd.map(
               lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
              )

final_result_df = spark.createDataFrame(rdd1, schema)
final_result_df.show(truncate=False)

返回:

 +---------------------------+-----+
 |ARRAY_COLS                 |index|
 +---------------------------+-----+
 |[colA_1, colB_11, colC_111]|0    |
 |[colA_2, colB_22, colC_222]|1    |
 +---------------------------+-----+

第二部分是带有 pyspark 的旧 zipWithIndex,如果您需要 0,1,.. 与 Scala 相比会很痛苦。

通常在 Scala 中更容易解决。

不确定性能,不是 foldLeft,很有趣。我觉得其实还可以。

于 2019-08-12T14:14:32.560 回答