1

我正在尝试使用 Matthew Rocklin 提出的名为 dask-spark 的项目。

在我的项目中添加dask-spark时,出现了一个问题:等待workers,如下图所示。

在这里,我将两个工作节点(dask)作为 dask-worker tcp://ubuntu8:8786 和 tcp://ubuntu9:8786 运行,并在独立模型上运行两个工作节点(spark),作为 worker-20180918112328-ubuntu8-45764和worker-20180918112413-ubuntu9-41972

等待工人

我的python代码如下:

from tpot import TPOTClassifier
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.externals import joblib
from dask.distributed import Client
import distributed.joblib    
from sklearn.externals.joblib import parallel_backend
from dask_spark import spark_to_dask
from pyspark import SparkConf, SparkContext
from dask_spark import dask_to_spark

if __name__ == '__main__':

  sc = SparkContext()    
  #connect to the cluster
  client = spark_to_dask(sc) 
  digits = load_digits()
  X_train, X_test, y_train, y_test = train_test_split(
    digits.data,
    digits.target,
    train_size=0.75,
    test_size=0.25,
  )

  tpot = TPOTClassifier(
  generations=2,
  population_size=10,
  cv=2,
  n_jobs=-1,
  random_state=0,
  verbosity=0  
  )
  with joblib.parallel_backend('dask.distributed', scheduler_host=' ubuntu8:8786'):
  tpot.fit(X_train, y_train)    

  print(tpot.score(X_test, y_test))

如果您能帮我解决这个问题,我将不胜感激。

4

2 回答 2

1

我在core.py中修改了程序,如下:

def spark_to_dask(sc, loop=None):
    """ Launch a Dask cluster from a Spark Context
    """
    cluster = LocalCluster(n_workers=None, loop=loop, threads_per_worker=None)
    rdd = sc.parallelize(range(1000))
    address = cluster.scheduler.address

之后,使用 Standalone 或 Mesos 在 Spark 上运行我的测试用例是成功的。

于 2018-09-25T09:30:58.787 回答
0

如项目的 README 中所述,dask-spark 并不成熟。这是一个周末项目,我不建议使用它。

相反,我建议使用此处描述的一种机制直接启动 Dask:http: //dask.pydata.org/en/latest/setup.html

如果您必须使用 Mesos,那么我不确定我是否会提供很多帮助,但是您可能会对 Marathon 上运行的包daskathon感兴趣。

于 2018-09-20T12:43:32.410 回答