1

我会尽力描述我的想法。

MS SQL 数据库中存储了一个文本内容。内容每天都以流的形式出现。有些人每天都会浏览内容,如果内容符合某些标准,则将其标记为已验证。只有一类。它要么“有效”,要么无效。

我想要的是基于已经验证的内容创建一个模型,保存它并使用这个模型来“预验证”或标记新的传入内容。也偶尔根据新验证的内容更新模型。希望我清楚地解释了自己。

我正在考虑根据创建的模型使用 Spark 流进行数据分类。和朴素贝叶斯算法。但是您将如何创建、更新和存储模型?有大约 200K+ 不同长度的经过验证的结果(文本)。我需要这么多的模型吗?以及如何在 Spark Streaming 中使用这个模型。

提前致谢。

4

1 回答 1

2

哇,这个问题非常广泛,并且与 相关Machine LearningApache Spark但是我会尝试给你一些提示或步骤来遵循(我不会为你做这项工作)。

  1. 导入您需要的所有库

    from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
    from pyspark.mllib.linalg import SparseVector
    from pyspark.mllib.regression import LabeledPoint
    import re
    
  2. 将数据加载到RDD

    msgs = [("I love Star Wars but I can't watch it today", 1.0),
            ("I don't love Star Wars and people want to watch it today", 0.0),
            ("I dislike not being able to watch Star Wars", 1.0),
            ("People who love Star Wars are my friends", 1.0),
            ("I preffer to watch Star Wars on Netflix", 0.0),
            ("George Lucas shouldn't have sold the franchise", 1.0),
            ("Disney makes better movies than everyone else", 0.0)]
    
    rdd = sc.parallelize(msgs)
    
  3. 标记您的数据(如果您使用ML可能会更容易)和

    rdd = rdd.map(lambda (text, label): ([w.lower() for w in re.split(" +", text)], label))
    
  4. 删除所有不必要的单词(广泛称为stop-words)和符号,例如,.&

    commons = ["and", "but", "to"]
    rdd = rdd.map(lambda (tokens, label): (filter(lambda token: token not in commons, tokens), label))
    
  5. 创建一个包含所有数据集中distinct所有单词的字典,这听起来很庞大,但它们并没有你想象的那么多,我敢打赌它们会适合你的主节点(但是还有其他方法可以解决这个问题,但为了简单起见,我会保持这种方式)。

    # finds different words
    words = rdd.flatMap(lambda (tokens, label): tokens).distinct().collect()
    diffwords = len(words)
    
  6. 将您features转换为DenseVectorSparseVector,我显然会推荐第二种方式,因为通常 aSparseVector需要更少的空间来表示,但这取决于数据。请注意,有更好的选择,例如hashing,但我试图保持对我冗长的方法的忠诚。之后将其tuple转换为LabeledPoint

    def sparsify(length, tokens):
        indices = [words.index(t) for t in set(tokens)]
        quantities = [tokens.count(words[i]) for i in indices]
    
        return SparseVector(length, [(indices[i], quantities[i]) for i in xrange(len(indices))])
    
    rdd = rdd.map(lambda (tokens, label): LabeledPoint(label, sparsify(diffwords, tokens)))
    
  7. 适合您最喜欢的模型,在这种情况下,由于别有用心,我使用了LogisticRegressionWithSGD 。

    lrm = LogisticRegressionWithSGD.train(rdd)
    
  8. 保存您的模型。

    lrm.save(sc, "mylovelymodel.model")
    
  9. 在另一个应用程序中加载您的LogisticRegressionModel 。

    lrm = LogisticRegressionModel.load(sc, "mylovelymodel.model")
    
  10. 预测类别。

    lrm.predict(SparseVector(37,[2,4,5,13,15,19,23,26,27,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))
    # outputs 0
    

请注意,我没有评估accuracy模型的,但是它看起来不是吗?

于 2015-12-18T01:52:44.190 回答