1

我正在尝试使用 hbase 作为后端存储的 spark/scala/janusgraph 测试 graphofgodsfactory 示例。

我的代码:samplejanusloading.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.graphx._
import org.apache.tinkerpop.gremlin.groovy.plugin.AbstractGremlinPlugin;
import org.apache.tinkerpop.gremlin.groovy.plugin.IllegalEnvironmentException;
import org.apache.tinkerpop.gremlin.groovy.plugin.PluginAcceptor;
import org.apache.tinkerpop.gremlin.groovy.plugin.PluginInitializationException;
import org.apache.tinkerpop.gremlin.tinkergraph.process.computer.TinkerGraphComputer;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.FileConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkInterceptorStrategy;
import org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.SparkSingleIterationStrategy;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
import org.janusgraph.core.JanusGraph._;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.diskstorage;
import org.janusgraph.graphdb;
import org.janusgraph.util;
import org.janusgraph.core.JanusGraphFactory._;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.attribute.Geo;
import org.janusgraph.core.attribute.Geoshape;
import org.janusgraph.example.GraphOfTheGodsFactory;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
object samplejanusloading {
  def main(args: Array[String]) = {

/*    Logger.getLogger("org").setLevel(Level.OFF)     
    Logger.getLogger("akka").setLevel(Level.OFF)*/
    //Start the Spark context
    val conf = new SparkConf()
      .setAppName("Janusgraph")
      .setMaster("local")
    val sc = new SparkContext(conf)
val sqlcontext = new SQLContext(sc)     


val confg = new HBaseConfiguration()
val admin = new HBaseAdmin(confg)



val grap = JanusGraphFactory.open("c:\\janusgraph\\conf\\janusgraph-hbase.properties")


println("factry open")
val mgmt = grap.openManagement()
println("factry opened")
val name = mgmt.makePropertyKey("name")
println("first key property ")

val age = mgmt.makePropertyKey("age").make();
val reason = mgmt.makePropertyKey("reason").make();
val place = mgmt.makePropertyKey("place").make();


println("first edges open")
mgmt.makeEdgeLabel("father").make()
println("first edges open")
        mgmt.makeEdgeLabel("mother").make()
        mgmt.makeEdgeLabel("battled").make();
       //   mgmt.buildEdgeIndex(battled, "battlesByTime")
        mgmt.makeEdgeLabel("lives").make();
        mgmt.makeEdgeLabel("pet").make();
        mgmt.makeEdgeLabel("brother").make();


println("edges open")
        mgmt.makeVertexLabel("titan").make();
        mgmt.makeVertexLabel("location").make();
        mgmt.makeVertexLabel("god").make();
        mgmt.makeVertexLabel("demigod").make();
        mgmt.makeVertexLabel("human").make();
        mgmt.makeVertexLabel("monster").make();
println("vrtx open")
        mgmt.commit();
        println("cmt open")
        val tx = grap.newTransaction();
 val saturn = tx.addVertex(T.label, "titan", "name", "saturn", "age", "100");
        val sky = tx.addVertex(T.label, "location", "name", "sky");
        val sea = tx.addVertex(T.label, "location", "name", "sea");
        val jupiter = tx.addVertex(T.label, "god", "name", "jupiter", "age", "5000");
        val neptune = tx.addVertex(T.label, "god", "name", "neptune", "age", "4500");
        val hercules = tx.addVertex(T.label, "demigod", "name", "hercules", "age", "30");
        val alcmene = tx.addVertex(T.label, "human", "name", "alcmene", "age", "45");
        val pluto = tx.addVertex(T.label, "god", "name", "pluto", "age", "4000");
        val nemean = tx.addVertex(T.label, "monster", "name", "nemean");
        val hydra = tx.addVertex(T.label, "monster", "name", "hydra");
        val cerberus = tx.addVertex(T.label, "monster", "name", "cerberus");
        val tartarus = tx.addVertex(T.label, "location", "name", "tartarus");

         jupiter.addEdge("father", saturn);
        jupiter.addEdge("lives", sky, "reason", "loves fresh breezes");
        jupiter.addEdge("brother", neptune);
        jupiter.addEdge("brother", pluto);

        neptune.addEdge("lives", sea).property("reason", "loves waves");
        neptune.addEdge("brother", jupiter);
        neptune.addEdge("brother", pluto);

        hercules.addEdge("father", jupiter);
        hercules.addEdge("mother", alcmene);
        hercules.addEdge("battled", nemean, "time", "1", "place", Geoshape.point(38.1f, 23.7f));
        hercules.addEdge("battled", hydra, "time", "2", "place", Geoshape.point(37.7f, 23.9f));
        hercules.addEdge("battled", cerberus, "time", "12", "place", Geoshape.point(39f, 22f));

        pluto.addEdge("brother", jupiter);
        pluto.addEdge("brother", neptune);
        pluto.addEdge("lives", tartarus, "reason", "no fear of death");
        pluto.addEdge("pet", cerberus);

        cerberus.addEdge("lives", tartarus);
println("done sir");

        // commit the transaction to disk
        tx.commit();
tx.close()




    sc.stop
  }
}

c:\janusgraph\conf\janusgraph-hbase.properties 文件:

    storage.backend=hbase
gremlin.graph=org.janusgraph.core.JanusGraphFactory
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
storage.hostname=127.0.0.1
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5

但是在 janusgraphfactory 中创建第二行时它的错误。

IEval name = mgmt.makePropertyKey("name")

输出:工厂开放工厂开放

我可以看到在 hbase 中创建的表名 janusgraph。但只加载了几行图形配置。

hbase(main):043:0> scan 'janusgraph'
ROW                        COLUMN+CELL                                                                
 configuration             column=s:cache.db-cache, timestamp=1507455998304001, value=\x8F\x01        
 configuration             column=s:cache.db-cache-clean-wait, timestamp=1507455998311001, value=\x8C\
                           xA8                                                                        
 configuration             column=s:cache.db-cache-size, timestamp=1507455998138001, value=\x94?\xE0\x
                           00\x00\x00\x00\x00\x00                                                     
 configuration             column=s:cache.db-cache-time, timestamp=1507455998308001, value=\x8D\x80\x0
                           0\x00\x00\x00\x02\xBF                                                      
 configuration             column=s:graph.janusgraph-version, timestamp=1507455998362001, value=\x92\x
                           A00.1.\xB1                                                                 
 configuration             column=s:graph.timestamps, timestamp=1507455998395001, value=\xB6\x81      
 configuration             column=s:hidden.frozen, timestamp=1507455998404001, value=\x8F\x01         
 configuration             column=s:system-registration.c0a8ef013936-Praddy1.startup-time, timestamp=1
                           507456041876001, value=\xC1\x80\x00\x00\x00Y\xD9\xF4)\x06C5L\x80           
1 row(s) in 6.1320 seconds

hbase(main):044:0>

错误是:

线程“main”中的异常java.lang.IllegalArgumentException:需要指定数据类型。

但是当我尝试

val name = mgmt.makePropertyKey("name").dataType(String).make(). 它没有做任何事情。但抛出同样的错误。

需要你帮助我做错了什么。基本上我正在尝试开发火花程序来创建关系并将其保存到我本地机器中的janusgraph。

4

1 回答 1

0

您应该classOf在 Scala 中使用(参考):

val name = mgmt.makePropertyKey("name").dataType(classOf[String]).make()

我还会注意到您的janusgraph-hbase.properties. 您应该从最小的一组属性开始,例如在分发的属性文件中找到的属性。

gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=hbase
storage.hostname=127.0.0.1
storage.hbase.table=janusgraph
cache.db-cache=true
cache.db-cache-clean-wait=20
cache.db-cache-time=180000
cache.db-cache-size=0.5
于 2017-10-08T19:20:10.123 回答