2

TLDR;

可以使用 spark 配置配置 Beam 便携式跑步者吗?更准确地说,是否可以spark.driver.host在 Portable Runner 中配置?

动机

目前,我们在 Kubernetes 集群中实现了气流,为了使用 TensorFlow Extended,我们需要使用 Apache Beam。对于我们的用例,Spark 将是合适的运行器,并且由于气流和 TensorFlow 是用 python 编码的,我们需要使用 Apache Beam 的便携式运行器(https://beam.apache.org/documentation/runners/spark/ #便携性)。

问题

便携式运行器在其容器内创建 spark 上下文,并且不会为驱动程序 DNS 配置留出空间,从而使工作 pod 内的执行程序无法与驱动程序(作业服务器)通信。

设置

  1. 根据 beam 文档,job serer 与气流在同一个 pod 中实现,以使用这两个容器之间的本地网络。作业服务器配置:
- name: beam-spark-job-server
  image: apache/beam_spark_job_server:2.27.0
  args: ["--spark-master-url=spark://spark-master:7077"]

作业服务器/气流服务:

apiVersion: v1
kind: Service
metadata:
  name: airflow-scheduler
  labels:
    app: airflow-k8s
spec:
  type: ClusterIP
  selector:
    app: airflow-scheduler
  ports:
    - port: 8793
      protocol: TCP
      targetPort: 8793
      name: scheduler
    - port: 8099
      protocol: TCP
      targetPort: 8099
      name: job-server
    - port: 7077
      protocol: TCP
      targetPort: 7077
      name: spark-master
    - port: 8098
      protocol: TCP
      targetPort: 8098
      name: artifact
    - port: 8097
      protocol: TCP
      targetPort: 8097
      name: java-expansion

端口 8097、8098 和 8099 与作业服务器相关,8793 与气流相关,7077 与 spark master 相关。

开发/错误

  1. 从气流容器测试一个简单的光束示例python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK时,我在气流吊舱上得到以下响应:
Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

和工人日志:

21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.

正如我们所看到的,执行程序不断退出,据我所知,这个问题是由执行程序和驱动程序(本例中的作业服务器)之间缺少通信造成的。此外,“--driver-url”使用随机端口“-Dspark.driver.port”转换为驱动程序 pod 名称。由于我们无法定义服务的名称,worker 尝试使用驱动程序的原始名称并使用随机生成的端口。由于配置来自驱动程序,因此更改 worker/master 中的默认 conf 文件不会产生任何结果。以这个答案为例,我尝试SPARK_PUBLIC_DNS在作业服务器中使用 env 变量,但这并没有导致工作日志中的任何更改。

观测值

直接在 kubernetes 中使用 kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client 具有服务的 spark 作业:

apiVersion: v1
kind: Service
metadata:
  name: spark-client
spec:
  selector:
    app: spark-client
  clusterIP: None

我得到了一个完整的工作 pyspark shell。如果我省略 --conf 参数,我会得到与第一个设置相同的行为(无限期退出执行程序)

21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"

4

2 回答 2

2

根据您的部署要求,我有三种解决方案可供选择。按难度排序:

  1. 使用 Spark “uber jar”作业服务器。这会在 Spark 主服务器中启动嵌入式作业服务器,而不是在容器中使用独立的作业服务器。这将大大简化您的部署,因为您根本不需要启动beam_spark_job_server容器。
python -m apache_beam.examples.wordcount \
--output ./data_test/ \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--environment_type=LOOPBACK
  1. 您可以通过 Spark 配置文件传递属性。创建 Spark 配置文件,并添加spark.driver.host您需要的任何其他属性。在docker run作业服务器的命令中,将该配置文件挂载到容器中,并将SPARK_CONF_DIR环境变量设置为指向该目录。

  2. 如果这些都不适合您,您也可以构建自己的自定义版本的作业服务器容器。从 Github 拉取 Beam 源。查看您要使用的发布分支(例如git checkout origin/release-2.28.0)。修改入口点 spark-job-server.sh并设置-Dspark.driver.host=x在那里。然后使用./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag".

于 2021-02-23T22:28:27.600 回答
0

让我修改答案。作业服务器需要能够与工人进行通信,反之亦然。保持退出的错误是由于这个。您需要进行配置,以便他们可以通信。一个 k8s 无头服务能够解决这个问题。

参考https://github.com/cometta/python-apache-beam-spark上的可行示例。如果它对你有用,可以帮助我“星”存储库

于 2021-08-16T02:29:14.807 回答