我正在尝试使用 pyspark 实现 fbprophet,但无法并行化所有可用内核上的代码(在我的机器上本地运行)。
我已经搜索了各种文章,试图了解为什么会发生这种情况。
您可以在下面找到应该发生并行化的代码块。我已经定义了所有映射函数
if __name__ == '__main__':
conf = (SparkConf()
.setMaster("local[*]")
.setAppName("SparkFBProphet Example"))
spark = (SparkSession
.builder
.config(conf=conf)
.getOrCreate())
# Removes some of the logging after session creation so we can still see output
# Doesnt remove logs before/during session creation
# To edit more logging you will need to set in log4j.properties on cluster
sc = spark.sparkContext
sc.setLogLevel("ERROR")
# Retrieve data from local csv datastore
print(compiling_pickle())
df = retrieve_data()
# Group data by app and metric_type to aggregate data for each app-metric combo
df = df.groupBy('column1', 'column2')
df = df.agg(collect_list(struct('ds', 'y')).alias('data'))
df = (df.rdd
.map(lambda r: transform_data(r))
.map(lambda d: partition_data(d))
.map(lambda d: create_model(d))
.map(lambda d: train_model(d))
.map(lambda d: make_forecast(d))
.map(lambda d: imp_predictions(d))
.saveAsTextFile("../data_spark_t/results"))
spark.stop()
在这个部分:
print(compiling_pickle())
df = retrieve_data()
加载、编译一个泡菜并生成一个 csv。使用检索功能,我只这样做:
df = (spark.read.option("header", "true")
.option("inferSchema", value=True)
.csv("../data_spark_t/database_created.csv"))
所以,有了这一切,我不明白为什么我的代码没有在执行时附加所有可用的核心。
只是指出一些已经测试过的点:
我的分区号是 500。我已经将它设置为等于 df 中的行数(在 'collect_list' 之后),但是没有用;
setMaster() 的所有可能组合都已实现;
任何人都可以帮忙吗?