0

我是 Python 和 Apache 生态系统的新手。我正在尝试通过 Apache NiFi 中的 ExecuteSparkInteractive 处理器提交 Pyspark 代码。我对这里使用的任何组件都没有详细的了解,我只是在谷歌搜索和试用。

通过这种方式,我已经在 EMR 中成功配置并启动了 Spark、NiFi 和 Livy。而且我可以在交互式会话中通过 Livy 提交 Pyspark 代码。

但是,当我将 ExecuteSparkInteractive 配置为通过 Livy 提交 Pyspark 代码时,什么也没有发生。Livy 会话管理器没有显示任何内容,并且在 ExecuteSparkInteractive 处理器中没有可见的错误。

这是我对 LivySessionController 的配置: LivySessionController

这是我在 ExecuteSparkInteractive 的属性下提交的示例代码。

import random
from pyspark import SparkConf, SparkContext
#create SparkContext using standalone mode
conf = SparkConf().setMaster("local").setAppName("SimpleETL")
sc = SparkContext.getOrCreate(conf)

NUM_SAMPLES = 100000

def sample(p):
  x, y = random.random(), random.random()
  return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)

print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

这是在交互式会话中对我有用的代码:

import json, pprint, requests, textwrap
host = 'http://localhost:8998'
data = {'kind': 'pyspark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)

#Get the session URL
session_url = host + r.headers['Location']
sn_r = requests.get(session_url, headers=headers)

statements_url = session_url + '/statements'

data = {
  'code': textwrap.dedent("""
import random
from pyspark import SparkConf, SparkContext
#create SparkContext using standalone mode
conf = SparkConf().setMaster("local").setAppName("SimpleETL")
sc = SparkContext.getOrCreate(conf)

NUM_SAMPLES = 100000
def sample(p):
  x, y = random.random(), random.random()
  return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample).reduce(lambda a, b: a + b)

print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
""")
}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)

这些是 nifi-app.log 的日志摘录:

#After starting the processor
2018-07-18 06:38:11,768 INFO [NiFi Web Server-112] o.a.n.c.s.StandardProcessScheduler Starting ExecuteSparkInteractive[id=ac05cd49-0164-1000-6793-2df960eb8de7]
2018-07-18 06:38:11,770 INFO [Monitor Processore Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled ExecuteSparkInteractive[id=ac05cd49-0164-1000-6793-2df960eb8de7] to run with 1 threads
2018-07-18 06:38:11,883 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@36fb0996 // Another save pending = false
2018-07-18 06:38:57,106 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@12830e23 checkpointed with 0 Records and 0 Swap Files in 7 milliseconds (Stop-the-world time = 2 milliseconds, Clear Edit Logs time = 2 millis), max Transaction ID -1

#After stopping the processor
2018-07-18 06:39:09,835 INFO [NiFi Web Server-106] o.a.n.c.s.StandardProcessScheduler Stopping ExecuteSparkInteractive[id=ac05cd49-0164-1000-6793-2df960eb8de7]
2018-07-18 06:39:09,835 INFO [NiFi Web Server-106] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.livy.ExecuteSparkInteractive
2018-07-18 06:39:09,838 INFO [Timer-Driven Process Thread-9] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling ExecuteSparkInteractive[id=ac05cd49-0164-1000-6793-2df960eb8de7] to run
2018-07-18 06:39:09,917 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@36fb0996 // Another save pending = false

有趣的是,当我在 NiFi 中启用 LivySessionController 时,Livy UI 显示了两个新会话——第一个创建的会话显示为“空闲”状态,而后一个(会话 ID 较大的会话)即使在几次之后仍保持在“开始”状态刷新。让我们分别给他们 Session Id 1 和 2。有趣的是,Session Id 2 将状态从“starting”变为“shutting_down”再变为“dead”。一旦它死了,就会创建一个状态为“starting”的新会话(Session Id 3),该状态后来变为“idle”。以下是这 3 个会话的日志摘录:

#Livy 1st session:
18/07/18 06:33:58 ERROR YarnClientSchedulerBackend: Yarn application has already exited with state FAILED!
18/07/18 06:33:58 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-84-145.ec2.internal:4040
18/07/18 06:33:58 INFO YarnClientSchedulerBackend: Shutting down all executors
18/07/18 06:33:58 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
18/07/18 06:33:58 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
18/07/18 06:33:58 INFO YarnClientSchedulerBackend: Stopped
18/07/18 06:33:58 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/07/18 06:33:59 INFO MemoryStore: MemoryStore cleared
18/07/18 06:33:59 INFO BlockManager: BlockManager stopped
18/07/18 06:33:59 INFO BlockManagerMaster: BlockManagerMaster stopped
18/07/18 06:33:59 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/07/18 06:33:59 INFO SparkContext: Successfully stopped SparkContext

#Livy 2nd session:
18/07/18 06:34:30 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.

#Livy 3rd session:
18/07/18 06:36:15 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
4

1 回答 1

-1

这里有几件事-

Livy 会话控制器:- 确保在启用控制器服务时每个节点看到 2 个会话,并且 spark UI 上的两个会话必须处于运行状态(但在带有 Nifi 的 python 代码运行之前不执行任何操作)。如果您看到不寻常的行为,请先专注于修复它。可能的操作 - 添加 StandardSSLContextService 控制器并设置 Keystore 和 truststore。并在 LivySessionController 中使用相同的内容(在属性下:SSL 上下文服务)

在 Python 代码中:我认为您不必导入 SparkConf、SparkContext,也无需创建 conf 和 sc。您只需要导入 Sparksession 如下 - from pyspark.sql import SparkSession

并且您可以简单地使用 spark(默认情况下它可用作 spark 会话变量),例如 - spark.sql(s""" ....slq-statement.. """) 或 spark.sparkContext for sc

您提到的最后一件事是“Livy 会话管理器没有显示任何内容,并且在 ExecuteSparkInteractive 处理器中没有可见的错误。” 为此,您可以在 ExecuteSparkInteractive 处理器之后添加一些虚拟处理器,例如 updateAttribute 并将其保持在禁用模式。此外,您必须在所有 3 种状态(成功、失败、等待)中将 spark 交互式处理器的输出定向到 updateAttribute。这样你就可以看到 pyspark 代码在 nifi 中运行后的结果。请参阅下图的示例。

我希望这可以帮助您解决问题。

如果您喜欢答案,请投票

用于测试 PySpark 代码的 Nifi 模板示例

于 2019-06-22T20:32:47.127 回答