m = Prophet()
m.fit(df)
遇到以下错误:
Unrecognized token 'Initial': was expecting 'null', 'true', 'false' or NaN
at [Source: Initial log joint probability = -13.932; line: 1, column: 8]
上述错误不断出现。尝试降级numpy
,重新安装pystan
,fbprophet
但问题仍未解决。
m = Prophet()
m.fit(df)
遇到以下错误:
Unrecognized token 'Initial': was expecting 'null', 'true', 'false' or NaN
at [Source: Initial log joint probability = -13.932; line: 1, column: 8]
上述错误不断出现。尝试降级numpy
,重新安装pystan
,fbprophet
但问题仍未解决。
我在尝试在 AWS EMR Spark 集群上使用时遇到了同样的问题/错误prophet
(使用 jupyter notebook 界面)。经过多次故障排除,我们意识到这是因为 Spark 期望返回一个特定的数据格式——我相信一个带有特定字段的 json——但prophet
返回一个pandas
数据帧。
我通过在 pyspark 中编写一个用户定义的函数 (udf) 解决了这个问题,它允许我在 Spark 数据帧上使用先知并指定将从这个 Spark 函数返回的数据。
我自己的解决方案基于此示例和此示例中的on Spark的pandas_udf
功能。prophet
下面是我编写的函数的通用版本。为了清楚起见,我试图在我拥有的数据上拟合一个时间序列模型以检测异常值,这就是为什么我拟合然后预测相同的数据。您还需要确保pyarrow
已安装以pandas_udf
在 Spark 中正确处理:
# Import relevant packages
import pyspark.sql.functions as F
import pyspark.sql.types as types
import prophet
# Define output schema of prophet model
output_schema = types.StructType([
types.StructField('id', types.IntegerType(), True), #args: name (string), data type, nullable (boolean)
types.StructField('ds', types.TimestampType(), True),
types.StructField('yhat', types.DoubleType(), True),
types.StructField('yhat_lower', types.DoubleType(), True),
types.StructField('yhat_upper', types.DoubleType(), True)
])
# Function to fit Prophet timeseries model
@F.pandas_udf(output_schema, F.PandasUDFType.GROUPED_MAP)
def fit_prophet_model(df):
"""
:param df: spark dataframe containing our the data we want to model.
:return: returns spark dataframe following the output_schema.
"""
# Prep the dataframe for use in Prophet
formatted_df = df[['timestamp', 'value_of_interest']] \
.rename(columns = {'timestamp': 'ds', 'value_of_interest': 'y'}) \
.sort_values(by = ['ds'])
# Instantiate model
model = prophet.Prophet(interval_width = 0.99,
growth = 'linear',
daily_seasonality = True,
weekly_seasonality = True,
yearly_seasonality = True,
seasonality_mode = 'multiplicative')
# Fit model and get fitted values
model.fit(formatted_df)
model_results = model.predict(formatted_df)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']] \
.sort_values(by = ['ds'])
model_results['id'] = formatted_df['id'] #add grouping id
model_results = model_results[['id', 'ds', 'yhat', 'yhat_lower', 'yhat_upper']] #get columns in correct order
return model_results
然后要对您的数据运行该函数,只需执行以下操作:
results = (my_data.groupBy('id') \
.apply(fit_prophet_model)
)
results.show(10) #show first ten rows of the fitted model results