3

当尝试使用 SparkGraphComputer 计算集群上泰坦图上的顶点数时,我收到一个错误,我不知道如何处理。我在我的代码和集群上使用 tinkerpop 3.1.1-incubating 和 Titan 1.1.0-SNAPSHOT 我安装了 datastax community edition 2.1.11 和 spark 1.5.2-bin-hadoop2.6

我整理了一个最小的 Java 示例来重现我的问题:

private void strippedDown() {
    // a normal titan cluster
    String titanClusterConfig = "titan-cassandra-test-cluster.properties";

    // a hadoop graph with cassandra as input and gryo as output
    String sparkClusterConfig = "titan-cassandra-test-spark.properties";

    String edgeLabel = "blank";

    // add a graph
    int n = 100;
    Graph titanGraph = GraphFactory.open(titanClusterConfig);
    Vertex superNode = titanGraph.addVertex(T.label, String.valueOf(0));
    for (int i=1;i<n;i++) {
        Vertex currentNode = titanGraph.addVertex(T.label, String.valueOf(i));
        currentNode.addEdge(edgeLabel,superNode);
    }
    titanGraph.tx().commit();

    //count with titan
    Long count = titanGraph.traversal().V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);

    // count the graph using titan graph computer
    count = titanGraph.traversal(GraphTraversalSource.computer(FulgoraGraphComputer.class)).V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);

    // count the graph using spark graph computer
    Graph sparkGraph = GraphFactory.open(sparkClusterConfig);
    count = sparkGraph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next();
    System.out.println("The number of vertices in the graph is: "+count);
}

使用 OLTP 和使用带有 FulgoraGraphComputer 的 OLAP 的计数返回正确答案。然而,使用 SparkGraphComputer 的 OLAP 计数会抛出 org.apache.spark.SparkException: Job aborted due to stage failure:

有趣的是,如果我通过与 Titan 打包在一起的 gremlin 控制台运行类似的脚本,我会因为似乎是相同的算法而得到不同的错误:

graph = GraphFactory.open('titan-cassandra-test-cluster.properties')
graph.addVertex(T.label,"0")
graph.addVertex(T.label,"1")
graph.addVertex(T.label,"2")
graph.tx().commit()
sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()

这会抛出org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_keyspace_args(keyspace:null)两次,但会完成并返回 0,这是不正确的。

我知道邮件列表中有这篇文章,但我无法理解它或解决问题。谁能向我解释发生了什么以及如何解决这个问题?我在下面粘贴了我的配置。

gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
storage.backend=cassandrathrift
storage.hostname=node1
storage.cassandra.keyspace=mindmapstest
storage.cassandra.replication-factor=3
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=none
####################################
# Cassandra Cluster Config         #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=250m
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

编辑:此脚本将重现错误

graph = TitanFactory.open('titan-cassandra-test-cluster.properties')
superNode = graph.addVertex(T.label,"0")
for(i in 1..100) {
  currentNode = graph.addVertex(T.label,i.toString())
  currentNode.addEdge("blank",superNode)
}
graph.tx().commit()

graph.traversal().V().count()

graph.traversal(computer()).V().count()

sparkGraph = GraphFactory.open('titan-cassandra-test-spark.properties')
sparkGraph.traversal(computer(SparkGraphComputer)).V().count()
4

2 回答 2

3

尝试将这些参数添加到您的 HadoopGraph 配置中。

#
# Titan Cassandra InputFormat configuration
# see https://github.com/thinkaurelius/titan/blob/titan11/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/cassandra/CassandraBinaryInputFormat.java
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
titanmr.ioformat.conf.storage.cassandra.keyspace=titan
titanmr.ioformat.cf-name=edgestore

#
# Apache Cassandra InputFormat configuration
# see https://github.com/apache/cassandra/blob/cassandra-2.2.3/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
# see https://github.com/thinkaurelius/titan/blob/titan11/titan-hadoop-parent/titan-hadoop-core/src/main/java/com/thinkaurelius/titan/hadoop/formats/cassandra/CassandraBinaryInputFormat.java
# not clear why these need to be set manually when using cassandrathrift
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.keyspace=titan
cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
cassandra.input.columnfamily=edgestore
cassandra.range.batch.size=2147483647
于 2016-07-22T14:32:58.000 回答
1

我不能 100% 确定我给出的理由是正确的,但我已经设法解决了这个问题。我的问题源于 3 个潜在问题,所有这些都与配置有关。

1) 第一个问题由 Jason 很好地解决了,这与连接到 Cassandra 的正确配置选项有关——我仍然很好奇他们甚至做了什么。

2)我无法让java代码成功运行的原因是我没有正确设置HADOOP_GREMLIN_LIBS环境变量。由于这个原因,jars 没有被部署到集群中以供图形计算机使用。一旦设置好,gremlin 控制台和 java 示例就会出现同样的问题 - 返回计数为零。

3) 零的计数是最难解决的。又是看不懂说明书的情况。有很多关于在我的集群上安装 hadoop 的参考资料,但没有说明如何连接到集群上的 hadoop。为了做到这一点,需要一个额外的配置选项,fs.defaultFS它告诉 gremlin 在哪里可以找到集群上的 hadoop 文件系统。一旦设置正确,顶点数就正确了。

我的理论是计算正确执行,但是当需要减少来自 spark 工作人员的计数时,它们会保留在集群的某个地方,然后当将答案返回到控制台时,查看本地文件系统并没有找到任何东西,因此返回零。这也许是一个错误?

无论如何,我需要的最终配置文件是这样的:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=/test/output
####################################
# Cassandra Cluster Config         #
####################################
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.cassandra.keyspace=mindmapstest
titanmr.ioformat.conf.storage.hostname=node1,node2,node3
titanmr.ioformat.cf-name=edgestore
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=spark://node1:7077
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
####################################
# Apache Cassandra InputFormat configuration
####################################
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.keyspace=mindmapstest
cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
cassandra.input.columnfamily=edgestore
cassandra.range.batch.size=2147483647
spark.eventLog.enabled=true
####################################
# Hadoop Cluster configuration     #
####################################
fs.defaultFS=hdfs://node1:9000
于 2016-07-27T16:04:12.110 回答