创建 ALS 模型对象后,使用 pyspark。
示例代码示例:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(rating_data, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
als_model = als_spec.fit(rating_data)
在这里,我只是在创建 ALS 模型并制作 cloudepickel。如果我们使用 fit 那么还需要进行转换吗?
我正在尝试使用以下代码选择我的 als_model 对象:
with open(os.path.join(model_path, 'als-als-model.pkl'), 'w') as out:
cloudpickle.dump(als_model, out)
我收到如下错误:
File "/usr/local/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 324, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o224.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
#011at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
#011at
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-124-8c94f4ee0de9> in <module>()
1
----> 2 tree.fit(data_location)
~/anaconda3/envs/mxnet_p36/lib/python3.6/site-packages/sagemaker/estimator.py in fit(self, inputs, wait, logs, job_name)
152 self.latest_training_job = _TrainingJob.start_new(self, inputs)
153 if wait:
--> 154 self.latest_training_job.wait(logs=logs)
155 else:
156 raise NotImplemented('Asynchronous fit not available')