0

我有 50 名工人,我想在所有工人身上运行我的工作。
在 master:8080 中,我可以看到那里的所有 worker,
在 master:4040/executors 中,我可以看到 50 个 executor,
但是当我运行我的作业时,信息显示如下:

14/10/19 14:57:07 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/10/19 14:57:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, slave11, NODE_LOCAL, 1302 bytes)
14/10/19 14:57:07 INFO nio.ConnectionManager: Accepted connection from [slave11/10.10.10.21:42648]
14/10/19 14:57:07 INFO nio.SendingConnection: Initiating connection to [slave11/10.10.10.21:54398]
14/10/19 14:57:07 INFO nio.SendingConnection: Connected to [slave11/10.10.10.21:54398], 1 messages pending
14/10/19 14:57:07 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on slave11:54398 (size: 2.4 KB, free: 267.3 MB)
14/10/19 14:57:08 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slave11:54398 (size: 18.4 KB, free: 267.2 MB)
14/10/19 14:57:12 INFO storage.BlockManagerInfo: Added rdd_2_0 in memory on slave11:54398 (size: 87.4 MB, free: 179.8 MB)
14/10/19 14:57:12 INFO scheduler.DAGScheduler: Stage 0 (first at GeneralizedLinearAlgorithm.scala:141) finished in 5.473 s
14/10/19 14:57:12 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5463 ms on slave11 (1/1)
14/10/19 14:57:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool


我的工作代码是这样的:(命令行)

master: $ ./spark-shell --master spark://master:7077 


这(scala代码):

import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

val fileName = "bc.txt"
val data = sc.textFile(fileName)

val splits = data.randomSplit(Array(0.9, 0.1), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

val training_1 = training.map { line =>
val parts = line.split(' ')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(x => x.toDouble).toArray))
}

val test_1 = test.map { line =>
val parts = line.split(' ')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(x => x.toDouble).toArray))
}
val numIterations = 200

val model = SVMWithSGD.train(training_1, numIterations)


我的问题是为什么我的集群上只运行一两个(有时)任务?
有什么方法可以配置任务数量还是由调度程序自动调度?
当我的工作在两个任务上运行并且它将与我在 master:4040 上观察到的两个执行器一起运行时,
它将提供 2 倍的加速,所以我想在所有执行器上运行我的工作,我该怎么做?

感谢大家。

4

1 回答 1

0

您可以使用minPartitions参数 intextFile来设置任务的最小数量,例如:

val data = sc.textFile(fileName, 10)

然而,更多的分区通常意味着更多的网络流量,因为更多的分区使 Spark 难以将任务分派给本地执行器运行。您需要自己找到余额数minPartitions

于 2014-10-20T06:40:19.320 回答