0

我在我的 spark 集群中使用 Lifetimes python 包并尝试计算一些指标——Lifetimes 包

我一直在使用火花 2.4.2。我有如下示例的数据框(原始数据有 800K 记录),包含“invoce_date”列和其他一些列(id、label、county 等)

# reproducible Spark 
df = sqlCtx.createDataFrame(
    [
        ('78aa', 1, 'A', '2020-04-14 19:00:00'),
        ('14aa', 3, 'B', '2020-04-17 16:00:00'),
        ('25aa', 5, 'A', '2020-04-14 15:30:00'),
        ('36aa', 7, 'B', '2020-04-14 21:30:00')
    ],
    ('id', 'X', 'label', 'invoce_date')
)

在这里,我试图利用 Lifetimes

import lifetimes

# set the last transaction date as the end point for this historical dataset


current_date = df.agg(max("invoce_date")).collect()[0][0]



# calculate the metrics
metrics = (
  lifetimes.utils.summary_data_from_transaction_data(
    df,
    customer_id_col='id',
    datetime_col='invoce_date',
    observation_period_end = current_date, 
    freq='D'
    )
  )

# display first few rows
metrics.head(10)

这会返回一个错误'DataFrame' object has no attribute 'sort_values',我 df = df.sort("invoce_date")在度量计算之前添加了但仍然不断收到相同的错误,我无法弄清楚

这是我的数据类型供参考

df.dtypes
[('id', 'string'),
 ('y', 'string'),
 ('label', 'string'),
 ('invoce_date', 'timestamp')]
4

1 回答 1

1

Lifetimes 使用Pandas数据框,而df您示例中的变量是PySpark数据框。在使用 Lifetimes 包中的函数之前,您必须通过调用将数据转换为 Pandas 数据框df.toPandas()(更多详细信息请点击此处)。

请注意,调用toPandas()会将所有数据加载到驱动程序的内存中。Lifetimes 不支持具有多个执行器的分布式计算。

于 2020-06-06T18:48:21.960 回答