0
m = Prophet()
m.fit(df)

遇到以下错误:

Unrecognized token 'Initial': was expecting 'null', 'true', 'false' or NaN
at [Source: Initial log joint probability = -13.932; line: 1, column: 8]

上述错误不断出现。尝试降级numpy,重新安装pystanfbprophet但问题仍未解决。

4

1 回答 1

0

我在尝试在 AWS EMR Spark 集群上使用时遇到了同样的问题/错误prophet(使用 jupyter notebook 界面)。经过多次故障排除,我们意识到这是因为 Spark 期望返回一个特定的数据格式——我相信一个带有特定字段的 json——但prophet返回一个pandas数据帧。

我通过在 pyspark 中编写一个用户定义的函数 (udf) 解决了这个问题,它允许我在 Spark 数据帧上使用先知并指定将从这个 Spark 函数返回的数据。

我自己的解决方案基于此示例此示例中的on Spark的pandas_udf功能。prophet

下面是我编写的函数的通用版本。为了清楚起见,我试图在我拥有的数据上拟合一个时间序列模型以检测异常值,这就是为什么我拟合然后预测相同的数据。您还需要确保pyarrow已安装以pandas_udf在 Spark 中正确处理:

# Import relevant packages
import pyspark.sql.functions as F
import pyspark.sql.types as types
import prophet

# Define output schema of prophet model
output_schema = types.StructType([
                                types.StructField('id', types.IntegerType(), True), #args: name (string), data type, nullable (boolean)
                                types.StructField('ds', types.TimestampType(), True),
                                types.StructField('yhat', types.DoubleType(), True),
                                types.StructField('yhat_lower', types.DoubleType(), True),
                                types.StructField('yhat_upper', types.DoubleType(), True)
                                ])

# Function to fit Prophet timeseries model
@F.pandas_udf(output_schema, F.PandasUDFType.GROUPED_MAP)
def fit_prophet_model(df):
    """
    :param df: spark dataframe containing our the data we want to model.
    :return: returns spark dataframe following the output_schema.     
    """
    
    # Prep the dataframe for use in Prophet
    formatted_df = df[['timestamp', 'value_of_interest']] \
        .rename(columns = {'timestamp': 'ds', 'value_of_interest': 'y'}) \
        .sort_values(by = ['ds'])
    
    # Instantiate model
    model = prophet.Prophet(interval_width = 0.99,
                            growth = 'linear',
                            daily_seasonality = True,
                            weekly_seasonality = True,
                            yearly_seasonality = True,
                            seasonality_mode = 'multiplicative')
    
    # Fit model and get fitted values
    model.fit(formatted_df)
    model_results = model.predict(formatted_df)[['ds', 'yhat', 'yhat_lower', 'yhat_upper']] \
                         .sort_values(by = ['ds'])
    model_results['id'] = formatted_df['id'] #add grouping id
    model_results = model_results[['id', 'ds', 'yhat', 'yhat_lower', 'yhat_upper']] #get columns in correct order
    
    return model_results

然后要对您的数据运行该函数,只需执行以下操作:

results = (my_data.groupBy('id') \
                  .apply(fit_prophet_model)
          )

results.show(10) #show first ten rows of the fitted model results
于 2021-07-15T21:34:17.470 回答