2

I have been messing around with Prefect for workflow management, but got stuck with building up and braking down a spark session withing Prefect's resource manager.

I browsed Prefects docs and an example with Dusk is available:

from prefect import resource_manager
from dask.distributed import Client

@resource_manager
class DaskCluster:
    def init(self, n_workers):
        self.n_workers = n_workers

    def setup(self):
        "Create a local dask cluster"
        return Client(n_workers=self.n_workers)

    def cleanup(self, client):
        "Cleanup the local dask cluster"
        client.close()
        
        
with Flow("example") as flow:
    n_workers = Parameter("n_workers")

    with DaskCluster(n_workers=n_workers) as client:
        some_task(client)
        some_other_task(client)        

However I couldn't work out how to do the same with a spark session.

4

1 回答 1

3

最简单的方法是在本地模式下使用 Spark:

from prefect import task, Flow, resource_manager

from pyspark import SparkConf
from pyspark.sql import SparkSession

@resource_manager
class SparkCluster:
    def __init__(self, conf: SparkConf = SparkConf()):
        self.conf = conf

    def setup(self) -> SparkSession:
        return SparkSession.builder.config(conf=self.conf).getOrCreate()

    def cleanup(self, spark: SparkSession):
        spark.stop()

@task
def get_data(spark: SparkSession):
    return spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])

@task(log_stdout=True)
def analyze(df):
    word_count = df.groupBy('word').count()
    word_count.show()


with Flow("spark_flow") as flow:
    conf = SparkConf().setMaster('local[*]')
    with SparkCluster(conf) as spark:
        df = get_data(spark)
        analyze(df)

if __name__ == '__main__':
    flow.run()

您的setup()方法返回被管理的资源,并且该cleanup()方法接受由setup(). 在这种情况下,我们创建并返回一个 Spark 会话,然后停止它。您不需要spark-submit或任何东西(尽管我发现以这种方式管理依赖项有点困难)。

扩大规模变得更加困难,这是我仍在努力解决的问题。例如,Prefect 不知道如何序列化 Spark DataFrames 以进行输出缓存或持久化结果。此外,您必须小心将 Dask 执行器与 Spark 会话一起使用,因为它们不能被腌制,因此您必须设置执行器以使用scheduler='threads'(请参阅此处)。

于 2021-09-14T14:06:42.640 回答