2

所以我在 GCP 上使用 Dataproc 建立了一个集群。1个主人,2个奴隶。我的系统使用 rabbitmq 每小时将项目放入队列,消费者通过 pyspark 为队列中的每个项目运行 spark 作业。注意:我已经对其进行了配置,因此我可以将 pyspark 导入到普通的 python 环境中,并从那里使用 spark。

经过长时间的运行,实际上并没有太长,系统打印出它不能更新它的租约,然后进入安全模式

WARN org.apache.hadoop.hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1938993080_12] for 304 seconds.  Will retry shortly ...  
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot renew lease for DFSClient_NONMAPREDUCE_-1938993080_12. Name node is in safe mode.
Resources are low on NN. Please add or free up more resources then turn off safe mode manually. NOTE:  If you turn off safe mode before adding resources, the NN will immediately return to safe mode. 

系统会继续运行一小段时间,然后最终关闭:

    at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:892)
    at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:423)
    at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:448)
    at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
    at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:304)
    at java.lang.Thread.run(Thread.java:745)
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/03/15 15:52:46 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.

我尝试再次启动该服务

python mq_consumer.py

看起来像这样

import json
import pika
from lex_engine import run_engine as re, db_tools as dbt, credentials as creds

# This file consumes form the rabbit mq queue and runs the jobs it gets from
# the queue in spark. It also sets the lock key, to ensure that multiple jobs
# with the same ID don't start at the same time.
ADDR = creds.RABBITMQ_VARS['address']
CONNECTION = pika.BlockingConnection(pika.ConnectionParameters(host=ADDR))
CHANNEL = CONNECTION.channel()
QUEUE = creds.RABBITMQ_VARS['queue']
MSERVER = dbt.get_mongo(creds.MONGO_CREDS)
MONGO_DB = MSERVER[creds.MONGO_DB_VARS['database']]
COLLECTION = MONGO_DB[creds.MONGO_DB_VARS['collection']]


for method_frame, properties, body in CHANNEL.consume(QUEUE):
# This removes the message from the queue
CHANNEL.basic_ack(method_frame.delivery_tag)
json_payload = json.loads(body)
if (json_payload['id'] is not None and
        json_payload['body'] is not None and
        json_payload['title'] is not None):
    lock_key = 'le-' + creds.ENVIRONMENT + '-' + json_payload['id']
    if COLLECTION.find_one({u'_id': lock_key}) is None:
        COLLECTION.insert_one({u'_id': lock_key, u'in_use': 1})
        re.run_engine(json_payload)
        COLLECTION.delete_one({u'_id': lock_key})

但随后 spark/hadoop 不再启动,打印以下内容:

16/03/15 20:29:09 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
16/03/15 20:29:09 INFO Remoting: Starting remoting
16/03/15 20:29:09 INFO Remoting: Remoting started; listening on addresses :        [akka.tcp://sparkDriverActorSystem@10.128.0.5:35762]
16/03/15 20:29:10 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-    SNAPSHOT
16/03/15 20:29:10 INFO org.spark-project.jetty.server.AbstractConnector:     Started SelectChannelConnector@0.0.0.0:4040
16/03/15 20:29:10 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to      ResourceManager at lexe-1-m/10.128.0.5:8032
16/03/15 20:29:11 INFO org.apache.hadoop.ipc.Client: Retrying connect to   server: lexe-1-m/10.128.0.5:8032. Already tried 0 time(s); retry policy is      RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:12 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:13 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:14 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:15 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:16 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:17 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:18 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:19 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
16/03/15 20:29:20 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: lexe-1-m/10.128.0.5:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)

如果我重新启动服务器(debian),它会再次工作,但只是在一段时间后再次成为同一个问题的牺牲品。有谁知道如何解决这个问题?我已经考虑为 NameNode 提供更多资源,但我认为这不应该阻止系统重新启动,对吧?任何人有任何见解?

4

0 回答 0