导入您需要的所有库
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
import re
将数据加载到RDD
msgs = [("I love Star Wars but I can't watch it today", 1.0),
("I don't love Star Wars and people want to watch it today", 0.0),
("I dislike not being able to watch Star Wars", 1.0),
("People who love Star Wars are my friends", 1.0),
("I preffer to watch Star Wars on Netflix", 0.0),
("George Lucas shouldn't have sold the franchise", 1.0),
("Disney makes better movies than everyone else", 0.0)]
rdd = sc.parallelize(msgs)
标记您的数据(如果您使用ML可能会更容易)和
rdd = rdd.map(lambda (text, label): ([w.lower() for w in re.split(" +", text)], label))
删除所有不必要的单词(广泛称为stop-words)和符号,例如,.&
commons = ["and", "but", "to"]
rdd = rdd.map(lambda (tokens, label): (filter(lambda token: token not in commons, tokens), label))
创建一个包含所有数据集中distinct
所有单词的字典,这听起来很庞大,但它们并没有你想象的那么多,我敢打赌它们会适合你的主节点(但是还有其他方法可以解决这个问题,但为了简单起见,我会保持这种方式)。
# finds different words
words = rdd.flatMap(lambda (tokens, label): tokens).distinct().collect()
diffwords = len(words)
将您features
转换为DenseVector或SparseVector,我显然会推荐第二种方式,因为通常 aSparseVector
需要更少的空间来表示,但这取决于数据。请注意,有更好的选择,例如hashing
,但我试图保持对我冗长的方法的忠诚。之后将其tuple
转换为LabeledPoint
def sparsify(length, tokens):
indices = [words.index(t) for t in set(tokens)]
quantities = [tokens.count(words[i]) for i in indices]
return SparseVector(length, [(indices[i], quantities[i]) for i in xrange(len(indices))])
rdd = rdd.map(lambda (tokens, label): LabeledPoint(label, sparsify(diffwords, tokens)))
适合您最喜欢的模型,在这种情况下,由于别有用心,我使用了LogisticRegressionWithSGD 。
lrm = LogisticRegressionWithSGD.train(rdd)
保存您的模型。
lrm.save(sc, "mylovelymodel.model")
在另一个应用程序中加载您的LogisticRegressionModel 。
lrm = LogisticRegressionModel.load(sc, "mylovelymodel.model")
预测类别。
lrm.predict(SparseVector(37,[2,4,5,13,15,19,23,26,27,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))
# outputs 0