1

我想从.describeTopics()输出中获取 Spark LDA 模型的术语索引,并将它们与计数矢量化器词汇表中的适当术语相匹配。这里是摩擦点:

terms = ['fuzzy', 'wuzzy', 'bear', 'seashells', 'chuck', 'wood', 'big', 'black', 'woodchuck', 'sell', 'hair', 'rug', 'sat', 'seashore', 'much', 'sells', 'many']
+-----+--------------+------------------------------------------------------------------------------------+
|topic|termIndices   |termWeights                                                                         |
+-----+--------------+------------------------------------------------------------------------------------+
|0    |[6, 16, 4, 13]|[0.07759153026889895, 0.07456018590515792, 0.06590443764744822, 0.06529979905841589]|
|1    |[0, 8, 1, 12] |[0.08460924078697102, 0.06935412981755526, 0.06803462316387827, 0.06505150660960128]|
|2    |[8, 14, 5, 3] |[0.07473487407268035, 0.06999332154185754, 0.06923579179113146, 0.06673236538997057]|
|3    |[2, 15, 10, 9]|[0.07990489772171691, 0.07352818255574894, 0.0725564301639141, 0.0705481456715216]  |
+-----+--------------+------------------------------------------------------------------------------------+

我想要的输出将是上面带有数组列的 Dataframe terms,其中包含基于termIndices.

这是设置问题的代码:

    from pyspark.sql.functions import udf
    from pyspark.ml.feature import CountVectorizer, StopWordsRemover, RegexTokenizer, NGram
    from pyspark.ml import Pipeline
    from pyspark.ml.clustering import LDA
    import numpy as np
    
    text_df = spark.createDataFrame([
        (0, "How much WOOD could a woodchuck chuck if a woodchuck could chuck wood?"),
        (1, "She sells SEASHELLS by the seashore. How many seashells did she sell?"),
        (2, "Fuzzy Wuzzy was a bear. Fuzzy Wuzzy had no hair. Fuzzy Wuzzy wasn't very fuzzy, was he?"),
        (3, "A BIG BLACK bear sat on a big black rug.")
        ], ['id','text'])
        
    # Arguments to be passed to functions    
    input_data = text_df
    text_col = "text"
    n_topics = 4
    n_gram = 1
    min_doc_freq = 0.1
    max_doc_freq = 0.9
    
    # Model pipeline; RegexTokenizer allows us to tokenize on the regex, forgoing an explicit symbol removal step
    tokenizer = RegexTokenizer(inputCol=text_col, outputCol="tokens", pattern=r"[\s{2,}&;!\.\(\)-<>/,\?]+")
    stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")
    ngrams = NGram(inputCol = "tokens_clean", n=n_gram, outputCol = "ngram")
    count_vec = CountVectorizer(inputCol="ngram", outputCol="features", minDF = min_doc_freq, maxDF = max_doc_freq)
    lda = LDA(k=n_topics, seed=477)
    pipeline = Pipeline(stages=[tokenizer, stopwords, ngrams, count_vec, lda])

    # Fitting and transforming
    model = pipeline.fit(input_data)

这是我尝试过的:

    # This section to experiment with matching vocabulary indices to words in terms
    topics = model.stages[-1].describeTopics(4) # This yields the desired topic table above
    terms = model.stages[-2].vocabulary # This yields the vocabulary above
    
    # Defining a UDF to try and match indices to terms
    @udf
    def indices_to_terms(indices):
        terms_subset = [terms[index] for index in indices]
        return(np.array(terms_subset))
    # Attempting to use UDF to add a terms column
    topics = (
        topics
       .withColumn("terms", indices_to_terms(F.col("termIndices")))
    )
    topics.show()

运行此代码时,我实际上并没有收到错误,但它没有显示任何内容。也许 UDF 不是正确的方法。我如何将索引termIndices与模型词汇表匹配terms并使其成为一个数组列?

4

1 回答 1

1

解决方案只是更仔细地定义我的 UDF。以下代码解决了我的问题。

    # This section to experiment with matching vocabulary indices to words in terms
    topics = model.stages[-1].describeTopics(4)
    terms = model.stages[-2].vocabulary
    
    # Pandas function for matching indices to terms in vocabulary
    def indices_to_terms(indices, terms=terms):
        terms_subset = [terms[index] for index in indices]
        return terms_subset
    # Defining Spark UDF from above function
    udf_indices_to_terms = F.udf(indices_to_terms, ArrayType(StringType()))

    topics = (
       topics
       .withColumn("terms", udf_indices_to_terms(F.col("termIndices")))
    )

于 2022-02-16T20:36:06.803 回答