我想你正在使用 spark MLlib 的 ALS 模型,它正在执行矩阵分解。该模型的结果是两个矩阵,一个用户特征矩阵和一个项目特征矩阵。
假设我们将在隐式情况下接收带有评级或交易的数据流,则该模型的真实(100%)在线更新将通过触发完全重新训练来更新每个新评级信息的两个矩阵再次对整个数据进行 ALS 模型 + 新评级。在这种情况下,运行整个 ALS 模型的计算成本很高,并且传入的数据流可能很频繁,因此它会过于频繁地触发完全重新训练,这一点受到限制。
因此,知道了这一点,我们可以寻找替代方案,单个评级不应该改变矩阵,而且我们有增量优化方法,例如 SGD。对于 Explicit Ratings 的情况,有一个有趣的(仍然是实验性的)库,它为每批 DStream 进行增量更新:
https://github.com/brkyvz/streaming-matrix-factorization
使用诸如 SGD 之类的增量方法的想法遵循的想法是,只要朝着梯度(最小化问题)移动,就可以保证朝着误差函数的最小值移动。因此,即使我们对单个新评分进行更新,仅针对该特定用户的用户特征矩阵,并且仅针对该特定项目评分的项目特征矩阵进行更新,并且更新是朝向梯度的,我们保证我们移动接近最小值,当然是一个近似值,但仍然接近最小值。
另一个问题来自 spark 本身和分布式系统,理想情况下,更新应该按顺序完成,对于每个新的传入评级,但是 spark 将传入流视为一个批处理,它作为一个 RDD 分发,因此为更新完成的操作将在不保证顺序的情况下对整个批次进行。
更详细地说,如果您使用的是 Prediction.IO,例如,您可以使用内置的常规训练和部署功能进行离线训练,但如果您想要在线更新,则必须访问每个批次的两个矩阵流,并使用 SGD 运行更新,然后要求部署新模型,此功能当然不在 Prediction.IO 中,您必须自己构建它。
SGD 更新的有趣说明:
http://stanford.edu/~rezab/classes/cme323/S15/notes/lec14.pdf