我有以下结构的RDD:
my_rdd = [Row(text='Hello World. This is bad.'), Row(text='This is good.'), ...]
我可以使用 python 函数执行并行处理:
rdd2=my_rdd.map(lambda f: f.text.split())
for x in rdd2.collect():
print(x)
它给了我预期的输出。
但是,当我尝试使用 spark-NLP 断句器或情感分析器时,我收到一个错误: PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
在这一行中:对于 rdd2.collect() 中的 x:
这是代码:
documenter = DocumentAssembler()\
.setInputCol("text")\
.setOutputCol("document")
sentencerDL = SentenceDetectorDLModel\
.pretrained("sentence_detector_dl", "en") \
.setInputCols(["document"]) \
.setOutputCol("sentences")
sd_pipeline = PipelineModel(stages=[documenter, sentencerDL])
sd_model = LightPipeline(sd_pipeline)
pipeline = PretrainedPipeline('analyze_sentiment', 'en')
如果我尝试:
rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))
或者
rdd2=my_rdd.map(lambda f: sd_model.fullAnnotate(f.text)[0]["sentences"].split()[0])
发生错误。当我在没有“映射”的情况下运行它们时,它们会按预期运行。
有人知道如何并行执行 spark-NLP 断句器或情感分析器吗?我做错了什么?
谢谢大家!