0

我有以下结构的RDD:

my_rdd = [Row(text='Hello World. This is bad.'), Row(text='This is good.'), ...]

我可以使用 python 函数执行并行处理:

rdd2=my_rdd.map(lambda f: f.text.split()) 
for x in rdd2.collect():
  print(x)

它给了我预期的输出。

但是,当我尝试使用 spark-NLP 断句器或情感分析器时,我收到一个错误: PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

在这一行中:对于 rdd2.collect() 中的 x:

这是代码:

documenter = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")
    
sentencerDL = SentenceDetectorDLModel\
  .pretrained("sentence_detector_dl", "en") \
  .setInputCols(["document"]) \
  .setOutputCol("sentences")

sd_pipeline = PipelineModel(stages=[documenter, sentencerDL]) 
sd_model = LightPipeline(sd_pipeline)
pipeline = PretrainedPipeline('analyze_sentiment', 'en')

如果我尝试:

rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))                    

或者

rdd2=my_rdd.map(lambda f: sd_model.fullAnnotate(f.text)[0]["sentences"].split()[0])

发生错误。当我在没有“映射”的情况下运行它们时,它们会按预期运行。

有人知道如何并行执行 spark-NLP 断句器或情感分析器吗?我做错了什么?

谢谢大家!

4

1 回答 1

0

当您将 Spark-ML 管道应用于数据分布在不同分区的数据帧时,默认情况下您将获得并行执行。spark-NLP 管道(也是 Spark-ML 管道)也是如此。所以你可以这样做,

pipeline.transform(数据帧)

并以数据分布在不同节点上的方式创建“数据框”。一个很好的教程在这里,

https://sparkbyexamples.com/pyspark/pyspark-create-dataframe-from-list/

此外,为了在使用 Spark-NLP 转换后映射数据帧的内容,您可以使用 sparknlp.functions 下的函数,例如 map_annotations_col,它可以让您映射数据帧中包含 Spark-NLP 注释的特定列的内容。顺便说一句,这个,

rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))

是你不应该做的事情,你得到了那个异常,因为 Spark 正在尝试序列化你的整个管道并将其发送到集群节点。这不是它应该工作的方式,您将数据传递到管道并让管道选择要分发到集群的内容。

于 2021-03-29T16:52:27.237 回答