1

应用调用 AWS API 的 Pyspark UDF 时,出现错误

PicklingError: Could not serialize object: TypeError: can't pickle SSLContext objects

代码是

import pyspark.sql.functions as sqlf
import boto3

comprehend = boto3.client('comprehend', region_name='us-east-1')

def detect_sentiment(text):
  response = comprehend.detect_sentiment(Text=text, LanguageCode='pt')
  return response["SentimentScore"]["Positive"]

detect_sentiment_udf = sqlf.udf(detect_sentiment)

test = df.withColumn("Positive", detect_sentiment_udf(df.Conversa))

其中df.Conversa包含简短的简单字符串。请问,我该如何解决这个问题?或者有什么替代方法?

4

2 回答 2

0

当你的 udf 被调用时,它会接收到整个上下文,并且这个上下文需要是可序列化的。boto 客户端不可序列化,因此您需要在 udf 调用中创建它。

如果您使用对象的方法作为 udf,如下所示,您将得到相同的错误。要修复它,请为客户端添加一个属性。

class Foo:
    def __init__(self):
        # this will generate an error when udf is called
        self.client = boto3.client('comprehend', region_name='us-east-1')

    # do this instead
    @property
    def client(self):
        return boto3.client('comprehend', region_name='us-east-1')

    def my_udf(self, text):
        response = self.client.detect_sentiment(Text=text, LanguageCode='pt')
        return response["SentimentScore"]["Positive"]

    def add_sentiment_column(self, df):
        detect_sentiment_udf = sqlf.udf(self.my_udf)
        return df.withColumn("Positive", detect_sentiment_udf(df.Conversa))

@johnhill2424 的回答将解决您的问题:

import pyspark.sql.functions as sqlf
import boto3

def detect_sentiment(text):
  comprehend = boto3.client('comprehend', region_name='us-east-1')
  response = comprehend.detect_sentiment(Text=text, LanguageCode='pt')
  return response["SentimentScore"]["Positive"]

detect_sentiment_udf = sqlf.udf(detect_sentiment)

test = df.withColumn("Positive", detect_sentiment_udf(df.Conversa))
于 2021-08-09T15:56:20.463 回答
0

将 comprehend boto3 客户端添加到 detect_sentiment 函数定义中。

于 2021-04-06T19:16:40.437 回答