0

我有一长串边缘 ID(大约 120 亿个),我愿意从我的 Titan 图(托管在 HBase 后端)中删除它们。

我怎样才能快速有效地做到这一点?

我尝试通过 Gremlin 去除边缘,但这对于那么多边缘来说太慢了。

是否可以直接在 HBase 上执行 Delete 命令?我该怎么做?(如何组装要删除的密钥?)

谢谢

4

1 回答 1

1

经过两天的研究,我想出了一个解决方案。

主要目的 - 给定一个非常大的字符串集合edgeIds,实现将它们从图中删除的逻辑 - 实现必须支持删除数十亿条边,因此它必须在内存和时间上高效。

直接使用 Titan 是不合格的,因为 Titan 执行了许多多余的不必要的实例化——通常,我们不想加载边缘,我们只想从 HBase 中删除它们。

/**
 * Deletes the given edge IDs, by splitting it to chunks of 100,000
 * @param edgeIds Collection of edge IDs to delete
 * @throws IOException
 */
public static void deleteEdges(Iterator<String> edgeIds) throws IOException {
    IDManager idManager = new IDManager(NumberUtil.getPowerOf2(GraphDatabaseConfiguration.CLUSTER_MAX_PARTITIONS.getDefaultValue()));
    byte[] columnFamilyName = "e".getBytes(); // 'e' is your edgestore column-family name
    long deletionTimestamp =  System.currentTimeMillis();
    int chunkSize = 100000; // Will contact HBase only once per 100,000 records two deletes (=> 50,000 edges, since each edge is removed one time as IN and one time as OUT)

    org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
    config.set("hbase.zookeeper.quorum", "YOUR-ZOOKEEPER-HOSTNAME");
    config.set("hbase.table", "YOUR-HBASE-TABLE");
    List<Delete> deletions = Lists.newArrayListWithCapacity(chunkSize);

    Connection connection = ConnectionFactory.createConnection(config);
    Table table = connection.getTable(TableName.valueOf(config.get("hbase.table")));

    Iterators.partition(edgeIds, chunkSize)
            .forEachRemaining(edgeIdsChunk -> deleteEdgesChunk(edgeIdsChunk, deletions, table, idManager,
                    columnFamilyName, deletionTimestamp));
}

/**
 * Given a collection of edge IDs, and a list of Delete object (that is cleared on entrance),
 * creates two Delete objects for each edge (one for IN and one for OUT),
 * and deletes it via the given Table instance
 */
public static void deleteEdgesChunk(List<String> edgeIds, List<Delete> deletions, Table table, IDManager idManager,
                                    byte[] columnFamilyName, long deletionTimestamp) {
    deletions.clear();

    for (String edgeId : edgeIds)
    {
        RelationIdentifier identifier = RelationIdentifier.parse(edgeId);

        deletions.add(createEdgeDelete(idManager, columnFamilyName, deletionTimestamp, identifier.getRelationId(),
                identifier.getTypeId(), identifier.getInVertexId(), identifier.getOutVertexId(),
                IDHandler.DirectionID.EDGE_IN_DIR);

        deletions.add(createEdgeDelete(idManager, columnFamilyName, deletionTimestamp, identifier.getRelationId(),
                identifier.getTypeId(), identifier.getOutVertexId(), identifier.getInVertexId(),
                IDHandler.DirectionID.EDGE_OUT_DIR));
    }

    try {
        table.delete(deletions);
    }
    catch (IOException e)
    {
        logger.error("Failed to delete a chunk due to inner exception: " + e);
    }
}

/**
 * Creates an HBase Delete object for a specific edge
 * @return HBase Delete object to be used against HBase
 */
private static Delete createEdgeDelete(IDManager idManager, byte[] columnFamilyName, long deletionTimestamp,
                                       long relationId, long typeId, long vertexId, long otherVertexId,
                                       IDHandler.DirectionID directionID) {

    byte[] vertexKey = idManager.getKey(vertexId).getBytes(0, 8); // Size of a long
    byte[] edgeQualifier = makeQualifier(relationId, otherVertexId, directionID, typeId);

    return new Delete(vertexKey)
            .addColumn(columnFamilyName, edgeQualifier, deletionTimestamp);
}

/**
 * Cell Qualifier for a specific edge
 */
private static byte[] makeQualifier(long relationId, long otherVertexId, IDHandler.DirectionID directionID, long typeId) {
    WriteBuffer out = new WriteByteBuffer(32); // Default length of array is 32, feel free to increase

    IDHandler.writeRelationType(out, typeId, directionID, false);
    VariableLong.writePositiveBackward(out, otherVertexId);
    VariableLong.writePositiveBackward(out, relationId);

    return out.getStaticBuffer().getBytes(0, out.getPosition());
}

请记住,我不考虑系统类型等——我假设给定的边缘 ID 是用户边缘。

使用这个实现,我能够在大约2 分钟内移除2000 万条边。

于 2016-02-22T20:36:08.933 回答