我现在正在和先知一起训练股票模型。有 3000 只股票需要训练。我需要为每个模型找到最佳参数。所以我使用带火花的先知,但效率不高。像这样的代码:
filter_code = "'000001'"
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://local:3306?useSSL=false") \
.option("dbtable", "(select code,datetime,open from table where code in (%s)) as table" % filter_code) \
.option("user", "user") \
.option("password","password") \
.option("numPartitions",1)
.option("partitionColumn","code")
.option("lowerBound", "000001")
.option("upperBound","000001")
.load()
df = df.withColumn('open',df['open'].cast('float'))
.withColumn('date',to_timestamp(df['datetime'],'yyyy/MM/dd HH:mm:ss').cast('timestamp'))
parameters = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306?useSSL=false").option("dbtable", "table").option("user", "user").option("password", "password").load()
evaluate = df.groupBy('code').agg(collect_list(struct(['date','open'])).alias("ds"))
evaluate_param = evaluate.join(parameters,evaluate.code==parameters.code_p,'left_outer').select('code','ds','changepoint_prior_scale','seasonality_prior_scale','monthly_seasonality','yearly_seasonality')
evaluate_result = evaluate_param.repartition(1) \
.rdd.mapPartitions(evaluationPartion)
print(evaluate_result.collect())
是时间序列数据,ds
是先知参数。changepoint_prior_scale, seasonality_prior_scale, monthly_seasonality, yearly_seasonality
评估分区功能如下:
def evaluationPartion(partitions):
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
import fbprophet
import numpy as np
result = []
for partition in partitions:
stock = pd.DataFrame.from_records(partition.ds, columns=['date','open'])
changepoint_prior_scale = partition.changepoint_prior_scale
weekly_seasonality = 'auto'
daily_seasonality = 'auto'
monthly_seasonality = False if partition.monthly_seasonality == 0 else True
yearly_seasonality = False if partition.yearly_seasonality == 0 else True
changepoints = None
seasonality_mode = 'additive'
seasonality_prior_scale = partition.seasonality_prior_scale
mcmc_samples = 0
interval_width = 0.80
uncertainty_samples = 100
stock['ds'] = stock['date']
stock['y'] = stock['open']
max_date = max(stock['date'])
min_date = min(stock['date'])
start_date = max_date - pd.DateOffset(months=1)
end_date = max_date
training_years = 3
train = stock[(stock['date'] < start_date) & (stock['date'] > (start_date - pd.DateOffset(years=training_years)))]
# get periods
train_max_date = max(train['date'])
time_diff = int((end_date.to_pydatetime()-train_max_date.to_pydatetime()).total_seconds()/60/5)
# Testing data is specified in the range
test = stock[(stock['date'] >= start_date) & (stock['date'] <= end_date)]
model = fbprophet.Prophet(daily_seasonality=daily_seasonality,
weekly_seasonality=weekly_seasonality,
yearly_seasonality=yearly_seasonality,
changepoint_prior_scale=changepoint_prior_scale,
changepoints=changepoints,
seasonality_mode=seasonality_mode,
seasonality_prior_scale=seasonality_prior_scale,
mcmc_samples=mcmc_samples,
interval_width=interval_width,
uncertainty_samples=uncertainty_samples)
if monthly_seasonality:
# Add monthly seasonality
model.add_seasonality(name = 'monthly', period = 30.5, fourier_order = 5)
model.fit(train)
# Make a future dataframe and predictions
future = model.make_future_dataframe(periods = time_diff, freq='5min')
future = model.predict(future)
# Merge predictions with the known values
test = pd.merge(test, future, on = 'ds', how = 'inner')
train = pd.merge(train, future, on = 'ds', how = 'inner')
# Calculate the differences between consecutive measurements
test['pred_diff'] = test['yhat'].diff()
test['real_diff'] = test['y'].diff()
# Correct is when we predicted the correct direction
test['correct'] = (np.sign(test['pred_diff']) == np.sign(test['real_diff'])) * 1
# Accuracy when we predict increase and decrease
increase_accuracy = 100 * np.mean(test[test['pred_diff'] > 0]['correct'])
decrease_accuracy = 100 * np.mean(test[test['pred_diff'] < 0]['correct'])
# Calculate mean absolute error
test_errors = abs(test['y'] - test['yhat'])
test_mean_error = np.mean(test_errors)
train_errors = abs(train['y'] - train['yhat'])
train_mean_error = np.mean(train_errors)
# Calculate percentage of time actual value within prediction range
test['in_range'] = False
for i in test.index:
if (test.ix[i, 'y'] < test.ix[i, 'yhat_upper']) & (test.ix[i, 'y'] > test.ix[i, 'yhat_lower']):
test.ix[i, 'in_range'] = True
in_range_accuracy = 100 * np.mean(test['in_range'])
predict_price = future.ix[len(future) - 1, 'yhat']
actual_price = test.ix[len(test) - 1, 'y']
yield [partition.code]+['open',predict_price,actual_price,train_mean_error,test_mean_error,increase_accuracy,decrease_accuracy,in_range_accuracy] +[partition.changepoint_prior_scale,partition.seasonality_prior_scale,partition.monthly_seasonality,partition.yearly_seasonality]
现在训练过程很慢,evaluate_result
计数是100,我需要5个小时才能训练100次。如何提高训练速度?谢谢!