0

我写的火花 UDF 出现酸洗错误。它在数据帧的每一行上应用火花管道并返回类(它是一个布尔值,True 或 False)。

以下管道适用于列表中添加的数据。我附上了下面的输出。

documentAssembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("sentence")

tokenizer = Tokenizer()\
  .setInputCols(["sentence"])\
  .setOutputCol("token")

bert_embeddings = BertEmbeddings.pretrained("biobert_pubmed_base_cased")\
  .setInputCols(["sentence", "token"])\
  .setOutputCol("embeddings")\
  .setMaxSentenceLength(512)

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["sentence", "embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")\
      .setStorageRef('biobert_pubmed_base_cased')

classsifierdl = ClassifierDLModel.pretrained("classifierdl_ade_biobert", "en", "clinical/models")\
      .setInputCols(["sentence", "sentence_embeddings"]) \
      .setOutputCol("class")

ade_clf_pipeline = Pipeline(
    stages=[documentAssembler, 
            tokenizer,
            bert_embeddings,
            embeddingsSentence,
            classsifierdl])

empty_data = spark.createDataFrame([[""]]).toDF("text")

ade_clf_model = ade_clf_pipeline.fit(empty_data)

ade_lp_pipeline = LightPipeline(ade_clf_model)

texts = ["I feel a bit drowsy & have a little blurred vision, after taking a pill.",
"I've been on Arthrotec 50 for over 10 years on and off, only taking it when I needed it.",
"Due to my arthritis getting progressively worse, to the point where I am in tears with the agony, gp's started me on 75 twice a day and I have to take it every day for the next month to see how I get on, here goes.",
"So far its been very good, pains almost gone, but I feel a bit weird, didn't have that when on 50."]

for text in texts:
  result = ade_lp_pipeline.annotate(text)
  print (result['class'][0])

这是结果

True
False
True
False

加载数据

import pyspark.sql.functions as F

! wget -q   https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Healthcare/data/sample_ADE_dataset.csv

ade_DF = spark.read\
                .option("header", "true")\
                .csv("./sample_ADE_dataset.csv")\
                .filter(F.col("label").isin(['True','False']))

ade_DF.show(truncate=100)

定义火花 UDF

import pyspark.sql.functions as f
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def get_ade_lp_class(text):
    ade_class = ade_lp_pipeline.annotate(text)['class']
    return ade_class

udf_get_ade_lp_class = udf(lambda x: get_ade_lp_class(x), StringType())
ade_DF.select('text',
              udf_get_ade_lp_class('text')).show()

这是错误

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.6/pickle.py", line 409, in dump
    self.save(obj)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 736, in save_tuple
    save(element)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 852, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 400, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 852, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "/usr/lib/python3.6/pickle.py", line 805, in _batch_appends
    save(x)
  File "/usr/lib/python3.6/pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
    save(state)
  File "/usr/lib/python3.6/pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
    save(v)
  File "/usr/lib/python3.6/pickle.py", line 496, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.6/dist-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o66.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
    589         try:
--> 590             return cloudpickle.dumps(obj, 2)
    591         except pickle.PickleError:

52 frames
Py4JError: An error occurred while calling o66.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)



During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
    598                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    599             cloudpickle.print_exec(sys.stderr)
--> 600             raise pickle.PicklingError(msg)
    601 
    602 

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o66.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
4

1 回答 1

0

我没有使用 Spark NLP 的经验,但请尝试以下操作:

ade_clf_model = ade_clf_pipeline.fit(empty_data)

ade_DF = spark.read\
                .option("header", "true")\
                .csv("./sample_ADE_dataset.csv")\
                .filter(F.col("label").isin(['True','False']))

result = ade_clf_model.transform(ade_DF)
于 2020-12-09T12:55:26.447 回答