我目前正在玩多处理和队列。我编写了一段代码来从 mongoDB 导出数据,将其映射为关系(平面)结构,将所有值转换为字符串并将它们插入 mysql。
这些步骤中的每一个都作为一个进程提交并给定导入/导出队列,这对于在父级中处理的 mongoDB 导出是安全的。
正如您将在下面看到的,我使用队列,当子进程从队列中读取“无”时,它们会自行终止。我目前遇到的问题是,如果子进程遇到未处理的异常,则父进程无法识别,其余进程继续运行。我想要发生的是整个shebang退出,充其量是重新提出孩子的错误。
我有两个问题:
- 如何检测父级中的子级错误?
- 检测到错误后如何杀死我的子进程(最佳实践)?我意识到将“无”放入队列以杀死孩子是很肮脏的。
我正在使用python 2.7。
以下是我的代码的基本部分:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only one,
# but I have the option to parallellize it further
writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
, columns, 'w_'+mysql_table, 1000) for i in range(1)]
# starting mapper
for mapper in mappers:
mapper.start()
time.sleep(1)
# starting converter
for converter in converters:
converter.start()
# starting writer
for writer in writers:
writer.start()
[... 初始化 mongo 数据库连接 ...]
# put each dataset read to queue for the mapper
for row in mongo_collection.find({inc_column: {"$gte": start}}):
mongo_input_result_q.put(row)
count += 1
if count % log_counter == 0:
print 'Mongo Reader' + " " + str(count)
print "MongoReader done"
# Processes are terminated when they read "None" object from queue
# now that reading is finished, put None for each mapper in the queue so they terminate themselves
# the same for all followup processes
for mapper in mappers:
mongo_input_result_q.put(None)
for mapper in mappers:
mapper.join()
for converter in converters:
mapper_result_q.put(None)
for converter in converters:
converter.join()
for writer in writers:
converter_result_q.put(None)
for writer in writers:
writer.join()