1

我正在尝试在已经属于 OreintDB 的两个顶点之间创建一条边。我的边缘数据在 MySQL 表中。

这是我的oetl json。

    {
      "config": {
        "log": "info"
      },
      "source": { "file": { "path": "/Users/RP/user_invited_data.csv" } },
  "extractor": { "csv": {"columnsOnFirstLine": true, "columns":["user_id:string", "invited_by:string", "invited_date:datetime"] } },
      "transformers" : [
       { "vertex": { "class": "User", "skipDuplicates": true} },
        { "edge": { "class": "INVITED", "direction" : "in", 
                "joinFieldName": "invited_by",
                "lookup":"select expand(u) from (match {class: User, as: u} return u) where u.user_id = ?;", 
                "unresolvedLinkAction":"NOTHING",
                "edgeFields": { "invited_date": "${input.invited_date}" },
                "skipDuplicates": true
               } 
        },
        { "field": 
          { "fieldNames": 
            [ "invited_by", "invited_date"], 
            "operation": "remove"
          } 
      }
      ],
      "loader" : {
        "orientdb": {
          "dbURL": "remote:localhost/abcd_graph",
          "dbUser": "root",
          "dbPassword": "root",
          "dbType": "graph",
          "dbAutoCreate": false,
          "batchCommit": 1000
        }
      }
    }

当我运行上面的 json 时,它会抛出ORecordDuplicatedExceptionUser 顶点。我创建了一个唯一索引user_id并拥有skipDuplicates = true. 任何建议将不胜感激。

更新: OrientDB 的 Gem,skipDuplicates当您的log级别不是 DEBUG. 但问题还没有解决。现在没有错误,但没有创建边缘。我会继续调试它,看看我今晚能不能修复它。

更新 在更深入地调试之后,我在存储级别得到了更深的异常。

com.orientechnologies.orient.core.exception.ODatabaseException: Impossible to serialize invalid link #-1:-1
    DB name="abcd_graph"
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.writeOptimizedLink(ORecordSerializerBinaryV0.java:867)
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.serializeValue(ORecordSerializerBinaryV0.java:754)
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinaryV0.serialize(ORecordSerializerBinaryV0.java:385)
    at com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerBinary.toStream(ORecordSerializerBinary.java:99)
    at com.orientechnologies.orient.core.record.impl.ODocument.toStream(ODocument.java:2381)
    at com.orientechnologies.orient.core.record.impl.ODocument.toStream(ODocument.java:664)
    at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.executeSaveRecord(ODatabaseDocumentTx.java:2183)
    at com.orientechnologies.orient.core.tx.OTransactionNoTx.saveRecord(OTransactionNoTx.java:191)
    at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.save(ODatabaseDocumentTx.java:2758)
    at com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx.save(ODatabaseDocumentTx.java:102)
    at com.orientechnologies.orient.core.record.impl.ODocument.save(ODocument.java:1805)
    at com.orientechnologies.orient.core.record.impl.ODocument.save(ODocument.java:1801)
    at com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx.addEdgeInternal(OrientGraphNoTx.java:242)
    at com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx.addEdgeInternal(OrientGraphNoTx.java:137)
    at com.tinkerpop.blueprints.impls.orient.OrientVertex.addEdge(OrientVertex.java:741)
    at com.tinkerpop.blueprints.impls.orient.OrientVertex.addEdge(OrientVertex.java:688)
    at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.createEdge(OEdgeTransformer.java:203)
    at com.orientechnologies.orient.etl.transformer.OEdgeTransformer.executeTransform(OEdgeTransformer.java:123)
    at com.orientechnologies.orient.etl.transformer.OAbstractTransformer.transform(OAbstractTransformer.java:39)
    at com.orientechnologies.orient.etl.OETLPipeline.execute(OETLPipeline.java:110)
    at com.orientechnologies.orient.etl.OETLProcessor$OETLPipelineWorker.call(OETLProcessor.java:620)
    at com.orientechnologies.orient.etl.OETLProcessor$OETLPipelineWorker.call(OETLProcessor.java:601)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

更新 我已将提取器从 DB 更改为 CSV,以便更容易重现。

创建架构:

CREATE class User IF NOT EXISTS extends V;
create property User.user_id IF NOT EXISTS String;
create property User.name IF NOT EXISTS String;
create index user_idx on User(user_id) unique;

insert into User set user_id = '1000_USER1', name = 'Bob';
insert into User set user_id = '1001_USER2', name = 'Robert';

CSV 样本:

user_id, ivited_by, invited_date
1001_USER2, 1000_USER1,
4

1 回答 1

0

经过一番挣扎并重新阅读了整个 ETL 文档和一些调试,我想通了。

我们需要使用MERG变压器而不是VERTEX. 合并转换器将查找 aVertex而不是创建它。

这是我的 json 看起来像

"transformers" : [
    { "merge": { "joinFieldName": "user_id", "lookup": "User.user_id" } },
    { "edge": { "class": "INVITED", "direction" : "out", 
            "joinFieldName": "invited_by",
            "lookup": "SELECT expand(u) from (match {class: User, as: u} return u) where u.user_id = ?", 
            "unresolvedLinkAction":"NOTHING",
            "edgeFields": { "invited_date": "${input.invited_date}" },
            "skipDuplicates": true
           } 
    },
    { "field": 
      { "fieldNames": 
        [ "invited_by", "invited_date"], 
        "operation": "remove"
      } 
  }
  ]

我还有另一个问题,但我会把它作为一个单独的东西来研究它。问题是它在相同的两个顶点之间创建了重复的边

这就是我所看到的

我将把它作为一个单独的问题来处理。

我在使用 OrientDB 时一直观察到的一件事是,这些东西就在那里,但很难弄清楚。

于 2017-08-23T19:11:30.160 回答