在下面的代码中,我尝试使用 URL 上的 env 变量实例化redis-py 连接。问题是,当我使用foreach 或 foreachPartition时,#save_on_redis 方法无法识别 env 变量。
我只是尝试在外部创建 redis 连接,但收到"pickle.PicklingError: Can't pickle 'lock' object",因为 spark 尝试在所有节点上同时运行这两种方法。
问题:如何在作为参数传递给 foreach 或 foreachPartition 的方法上使用环境变量?
import os
from pyspark.sql import SparkSession
import redis
spark = (SparkSession
.builder
.getOrCreate())
print "---------"
print os.getenv("REDIS_REPORTS_URL")
print "---------"
def save_on_redis(row):
redis_ = redis.StrictRedis(host=os.getenv("REDIS_REPORTS_URL"), port=6379, db=0)
print os.getenv("REDIS_REPORTS_URL")
print redis_
redis_.set("#teste#", "fagner")
df = spark.createDataFrame([(0,1), (0,1), (0,2)], ["id", "score"])
df.foreach(save_on_redis)