我尝试使用 multiprocessing.Pool 并行执行几个 CypherQuery()。
当我非并行运行 neo4j.CypherQuery() 时,它工作正常。当我在 multiprocessing.Pool 上只运行 1 个 neo4j.CypherQuery() 时,它工作正常。一旦我启动 2 个或更多 neo4j.CypherQuery() 进程,它就会失败并显示以下错误消息。
from mulitprocessing import Pool
from py2neo import neo4j
pool = Pool(processes=4)
db = neo4j.GraphDatabaseService("http://localhost:7474/db/data/")
def cypher_query(db):
try:
# very simple cypher query
query_string = "MATCH (n:Label) RETURN n.name, n"
query = neo4j.CypherQuery(db, query_string)
result = query.execute()
return_dict = {}
for r in result:
return_dict[r[0]] = r[1]
return return_dict
except:
# print stack trace
print('%s' % (traceback.format_exc()))
result1 = pool.apply_async(cypher_query, [db])
result2 = pool.apply_async(cypher_query, [db])
# close pool and wait for all processes to finish
pool.close()
pool.join()
# here I would collect results, something fails before
result1.get()
result2.get()
错误信息:
Traceback (most recent call last):
File "/path/to/my/script.py", line 237, in my_function
query = neo4j.CypherQuery(db, query_string)
File "build/bdist.linux-x86_64/egg/py2neo/neo4j.py", line 976, in __init__
self._cypher = Resource(graph_db.__metadata__["cypher"])
File "build/bdist.linux-x86_64/egg/py2neo/neo4j.py", line 320, in __metadata__
self.refresh()
File "build/bdist.linux-x86_64/egg/py2neo/neo4j.py", line 342, in refresh
self._metadata = ResourceMetadata(self._get().content)
File "build/bdist.linux-x86_64/egg/py2neo/packages/httpstream/http.py", line 532, in content
elif self.is_text:
File "build/bdist.linux-x86_64/egg/py2neo/packages/httpstream/http.py", line 513, in is_text
return self.content_type.partition("/")[0] == "text"
AttributeError: 'NoneType' object has no attribute 'partition'
我不太明白错误消息。我尝试了不同的 Cypher 查询以及 execute() 和 stream() ,但它总是失败。所有查询都运行良好,非并行。显然,我错过了一些破坏我的函数并行化的东西,但我不知道如何解决它。