0

我有一个插入许多节点和关系的代码:

from neo4jrestclient.client import GraphDatabase
from neo4jrestclient import client
import psycopg2

db = GraphDatabase("http://127.0.0.1:7474",username="neo4j", password="1234")

conn = psycopg2.connect("\
    dbname='bdTrmmTest'\
    user='postgres'\
    host='127.0.0.1'\
    password='1234'\
    ");

inicio = 0

while(inicio <= 4429640):
    c = conn.cursor()
    c.execute("SELECT p.latitude, p.longitude, h.precipitacaoh, h.datah, h.horah FROM pontos AS p, historico AS h WHERE p.gid = h.gidgeo_fk LIMIT 1640 OFFSET %d"%(inicio))

    sensorlatlong = db.labels.create("LaLo")
    sensorprecip = db.labels.create("Precipitacao")
    sensordata = db.labels.create("Data")
    sensorhora = db.labels.create("Hora")

    records = c.fetchall()

    for i in records:
        s2 = db.nodes.create(precipitacao=i[2])
        sensorprecip.add(s2)
        s5 = db.nodes.create(horah=i[4])
        sensorhora.add(s5)
        s5.relationships.create("REGISTROU",s2)
        q = 'MATCH (s:LaLo) WHERE s.latitude = "%s" AND s.longitude = "%s" RETURN s'%(str(i[0]),str(i[1]))
        results = db.query(q, returns=(client.Node))
        q2 = 'MATCH (s:LaLo)-->(d:Data)-->(h:Hora)-->(p:Precipitacao) WHERE s.latitude = "%s" AND s.longitude = "%s" AND d.datah = "%s" RETURN d'%(str(i[0]), str(i[1]), str(i[3]))
        results1 = db.query(q2, returns=(client.Node))   
        if (len(results) > 0):
            n = results[0].pop()
            if(len(results1) > 0):
                n1 = results1[0].pop()
                n1.relationships.create("AS", s5)
            else:
                s4 = db.nodes.create(datah=i[3])
                sensordata.add(s4)
                n.relationships.create("EM", s4)
                s4.relationships.create("AS",s5)
        else:
            s3 = db.nodes.create(latitude=i[0],longitude=i[1])
            sensorlatlong.add(s3)
            if(len(results1) > 0):
                n1 = results1[0].pop()
                n1.relationships.create("AS", s5)
            else:
                s4 = db.nodes.create(datah=i[3])
                sensordata.add(s4)
                s3.relationships.create("EM", s4)
                s4.relationships.create("AS",s5)

    inicio = inicio+1640

但是插入需要这么多天。如何在此代码中批量插入以减少插入时间?我读了这篇文章http://jexp.de/blog/2012/10/parallel-batch-inserter-with-neo4j/但它是用 Java 编写的。

4

1 回答 1

0

我没有使用 Python 中的 Neo4j,但我很确定客户端的工作方式与其他语言相同,这意味着您的代码将生成许多不同的 HTTP 连接,操作低级节点和关系端点。这意味着很多延迟。

它还会生成许多不同的查询,因为它使用字符串替换而不是使用参数化查询,并且 Neo4j 将不得不解析它们中的每一个。

使用少量参数化的 Cypher 查询,甚至一个,你会好得多。

如果我正确阅读了neo4jrestclient 的文档,我认为它看起来像这样:

c.execute("SELECT p.latitude, p.longitude, h.precipitacaoh, h.datah, h.horah FROM pontos AS p, historico AS h WHERE p.gid = h.gidgeo_fk LIMIT 1640 OFFSET %d"%(inicio))

records = c.fetchall()

q = """
MERGE (lalo:LaLo {latitude: {latitude}, longitude: {longitude}})
WITH lalo
MERGE (lalo)-[:EM]->(data:Data {datah: {datah}})
WITH data
CREATE (data)-[:AS]->(hora:Hora {horah: {horah}})
CREATE (hora)-[:REGISTROU]->(:Precipitacao {precipitacao: {precipitacao}})
"""

for i in records:
    params = {
        "latitude": str(i[0]),
        "longitude": str(i[1]),
        "precipitacao": i[2],
        "datah": i[3],
        "horah": i[4],
    }
    db.query(q=q, params=params)

当然,如果您有索引,它会运行得更快,因此您需要先创建那些(至少是前 2 个),例如在循环之前或进程之外:

CREATE INDEX ON :LaLo(latitude)
CREATE INDEX ON :LaLo(longitude)
CREATE INDEX ON :Data(datah)

您可以做的最后一件事是使用 transactions来加快速度,因此写入是分批进行的。

  1. 打开交易

    tx = db.transaction(for_query=True)
    
  2. 追加(例如)最多一千个查询(如果到达行尾则更少)

    params = // ...
    tx.append(q=q, params=params)
    
  3. 提交事务

    tx.execute()
    
  4. 重复直到你用完 SQL 数据库中的行

于 2016-07-08T14:18:36.577 回答