I have been messing around with Prefect for workflow management, but got stuck with building up and braking down a spark session withing Prefect's resource manager.
I browsed Prefects docs and an example with Dusk is available:
from prefect import resource_manager
from dask.distributed import Client
@resource_manager
class DaskCluster:
def init(self, n_workers):
self.n_workers = n_workers
def setup(self):
"Create a local dask cluster"
return Client(n_workers=self.n_workers)
def cleanup(self, client):
"Cleanup the local dask cluster"
client.close()
with Flow("example") as flow:
n_workers = Parameter("n_workers")
with DaskCluster(n_workers=n_workers) as client:
some_task(client)
some_other_task(client)
However I couldn't work out how to do the same with a spark session.