所以我在 GCP 上使用 Dataproc 建立了一个集群。1个主人,2个奴隶。我的系统使用 rabbitmq 每小时将项目放入队列,消费者通过 pyspark 为队列中的每个项目运行 spark 作业。注意:我已经对其进行了配置,因此我可以将 pyspark 导入到普通的 python 环境中,并从那里使用 spark。
经过长时间的运行,实际上并没有太长,系统打印出它不能更新它的租约,然后进入安全模式
WARN org.apache.hadoop.hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1938993080_12] for 304 seconds. Will retry shortly ...
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot renew lease for DFSClient_NONMAPREDUCE_-1938993080_12. Name node is in safe mode.
Resources are low on NN. Please add or free up more resources then turn off safe mode manually. NOTE: If you turn off safe mode before adding resources, the NN will immediately return to safe mode.
系统会继续运行一小段时间,然后最终关闭:
at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:892)
at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:423)
at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:448)
at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:304)
at java.lang.Thread.run(Thread.java:745)
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
我尝试再次启动该服务
python mq_consumer.py
看起来像这样
import json
import pika
from lex_engine import run_engine as re, db_tools as dbt, credentials as creds
# This file consumes form the rabbit mq queue and runs the jobs it gets from
# the queue in spark. It also sets the lock key, to ensure that multiple jobs
# with the same ID don't start at the same time.
ADDR = creds.RABBITMQ_VARS['address']
CONNECTION = pika.BlockingConnection(pika.ConnectionParameters(host=ADDR))
CHANNEL = CONNECTION.channel()
QUEUE = creds.RABBITMQ_VARS['queue']
MSERVER = dbt.get_mongo(creds.MONGO_CREDS)
MONGO_DB = MSERVER[creds.MONGO_DB_VARS['database']]
COLLECTION = MONGO_DB[creds.MONGO_DB_VARS['collection']]
for method_frame, properties, body in CHANNEL.consume(QUEUE):
# This removes the message from the queue
CHANNEL.basic_ack(method_frame.delivery_tag)
json_payload = json.loads(body)
if (json_payload['id'] is not None and
json_payload['body'] is not None and
json_payload['title'] is not None):
lock_key = 'le-' + creds.ENVIRONMENT + '-' + json_payload['id']
if COLLECTION.find_one({u'_id': lock_key}) is None:
COLLECTION.insert_one({u'_id': lock_key, u'in_use': 1})
re.run_engine(json_payload)
COLLECTION.delete_one({u'_id': lock_key})
但随后 spark/hadoop 不再启动,打印以下内容:
16/03/15 20:29:09 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
16/03/15 20:29:09 INFO Remoting: Starting remoting
16/03/15 20:29:09 INFO Remoting: Remoting started; listening on addresses : [akka.tcp://sparkDriverActorSystem@10.128.0.5:35762]
16/03/15 20:29:10 INFO org.spark-project.jetty.server.Server: jetty-8.y.z- SNAPSHOT
16/03/15 20:29:10 INFO org.spark-project.jetty.server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/03/15 20:29:10 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at lexe-1-m/10.128.0.5:8032
16/03/15 20:29:11 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:12 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:13 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:14 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:15 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:16 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:17 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:18 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:19 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:20 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
如果我重新启动服务器(debian),它会再次工作,但只是在一段时间后再次成为同一个问题的牺牲品。有谁知道如何解决这个问题?我已经考虑为 NameNode 提供更多资源,但我认为这不应该阻止系统重新启动,对吧?任何人有任何见解?