4

我有DataFrame两列:

df = sqlContext.createDataFrame([
    (1, 'a'), (2, 'a'),
    (3, 'b'), (4, 'b'),
    (5, 'c'), (6, 'c'),
    (7, 'd'), (8, 'd'),
], schema=['value', 'name'])

编辑 2017/01/13:我从基于实体-属性-值模型的 SQL 表中派生出这个数据框。因此,每一行都可以使用额外的第三个实体列“id”。

我想将其转换为包DataFrame分类器所需的“功能” ml。对于单个列,这可以使用以下方法实现VectorAssembler

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['value'], outputCol="features")
selected_features = assembler.transform(df).select('features')
selected_features.collect()

[Row(features=DenseVector([1.0])),
 Row(features=DenseVector([2.0])),
 Row(features=DenseVector([3.0])),
 Row(features=DenseVector([4.0])),
 Row(features=DenseVector([5.0])),
 Row(features=DenseVector([6.0])),
 Row(features=DenseVector([7.0])),
 Row(features=DenseVector([8.0]))]

我想要的是这样的:

[Row(features=DenseVector([1.0, 2.0])),
 Row(features=DenseVector([3.0, 4.0])),
 Row(features=DenseVector([5.0, 6.0])),
 Row(features=DenseVector([7.0, 8.0]))]

根据column 的值将 column 的值组合value成 a的最有效方法是什么?DenseVectorname

例如,我正在考虑适用于GroupedData的自定义聚合函数groupby

df.groupby('name').vector_agg().collect()

类似于 PostgreSQL array_agg函数:

SELECT array_agg(df.value) FROM table as df
GROUP BY df.name;
4

2 回答 2

2

我认为您的问题定义不明确,因为对于固定问题name,无法知道哪个value属于哪个列。包中的分类器ml都要求在训练样本之间一致地使用每一列。在您的示例中,这些列恰好按所需的顺序提供,但实际上您不能依赖于此。

如果您可以提供功能索引并从以下内容开始,则可以解决您的问题:

df = sc.sql.createDataFrame([
    ('a', ('f1', 1)), ('a', ('f2', 2)),
    ('b', ('f1', 3)), ('b', ('f2', 4)),
    ('c', ('f1', 5)), ('c', ('f2', 6)),
    ('d', ('f1', 7)), ('d', ('f2', 8)),
], schema=['name', 'feature'])

首先,name将您的功能分组并汇总为一个列表:

import pyspark.sql.functions as F

df.groupBy('name')\
  .agg(F.collect_list('feature'))\
  .show()

输出:

+----+---------------------+
|name|collect_list(feature)|
+----+---------------------+
|   d|     [[f1,7], [f2,8]]|
|   c|     [[f1,5], [f2,6]]|
|   b|     [[f1,3], [f2,4]]|
|   a|     [[f1,1], [f2,2]]|
+----+---------------------+

接下来,使用 udf inwithColumn将此数组转换为 DenseVector。把它们放在一起:

from pyspark.ml.linalg import Vectors, VectorUDT
import pyspark.sql.functions as F

list_to_dense = F.udf(lambda l: Vectors.dense([v for (k,v) in sorted(l)]), VectorUDT())

df.groupBy('name')\
  .agg(F.collect_list('features'))\
  .withColumn('features', list_to_dense('collect_list(features)'))\
  .select('features')\
  .collect()

输出:

[Row(features=DenseVector([7.0, 8.0])),
 Row(features=DenseVector([5.0, 6.0])),
 Row(features=DenseVector([3.0, 4.0])),
 Row(features=DenseVector([1.0, 2.0]))]
于 2017-02-12T19:54:01.550 回答
1

从您的数据结构中,您只需要join使用同一张表和相同(或倒置)的filter那些行。values

df = sqlContext.createDataFrame([
    (1, 'a'), (2, 'a'),
    (3, 'b'), (4, 'b'),
    (5, 'c'), (6, 'c'),
    (7, 'd'), (8, 'd'),
], schema=['value', 'name'])

xf = df.select(df["name"].alias("nam"), df["value"].alias("val"))
pf = df.join(xf, df["name"] == xf["nam"], "inner").where(xf["val"] < df["value"]).select(df["value"], xf["val"], df["name"])

from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=['value', "val"], outputCol="features")
selected_features = assembler.transform(pf).select('features')
selected_features.collect()


#[Row(features=DenseVector([2.0, 1.0])),
# Row(features=DenseVector([4.0, 3.0])),
# Row(features=DenseVector([6.0, 5.0])),
# Row(features=DenseVector([8.0, 7.0]))]
于 2016-05-04T17:59:35.307 回答