前奏:几个月的 FaunusGraph 和 TitanGraph 使用 Gremlin“方言”的经验,非常了解功能和句法差异。已成功使用 Faunus 脚本步骤 ( http://architects.dzone.com/articles/distributed-graph-computing , https://github.com/thinkaurelius/faunus/blob/master/src/main/java/com/thinkaurelius /faunus/mapreduce/sideeffect/ScriptMap.java)用于相对简单的子图删除和变异。
问题:实现了一个复杂的变异脚本映射,以按照面向方向的命名属性约定将边缘属性“移动”到外顶点或内顶点。我的 TitanGraph Gremlin 原型适用于小图,但我无法让按比例放大的实现工作:地图成功完成但图表没有更改(我正在提交更改)。注意:我的 Logger 对象仅输出显示前缀 args 的第一条 INFO 消息,表明我没有满足边缘命名空间保护条件(我在没有条件的情况下运行,但没有更改)。以下是我的代码(来自内部网络的胖手指,因此可能存在拼写错误)
//faunus pipe driver - usage gremlin -e faunus.ns.set-props.grm
import java.io.Console
//get args
console=System.console()
arg=console.readLine('> type <namespace>;<faunus.ns.set-props.mapper_path>;<from_prefix>;<to_prefix>
inargs=arg.split(";")
//establish FaunusGraph connection
f=FaunusFactory.open('titan-client.properties')
f.getConf().set("faunus.graph.input.titan.storage.read-consistency-level", "ALL")
f.getConf().set("faunus.graph.input.titan.storage.write-consistency-level", "ALL")
//Faunus pipe incl. script step
f.V().has("_namespace", inargs[0]).script(inargs[1], inargs[2], inargs[3]
//script map - usage f.V().has("_namespace", <namespace_string>).script(<this_script_path>, <outV_key_prefix_string>, <inV_key_prefix_string>)
def g
def mylog
def setup(args) {
mylog=java.util.logging.Logger.getLogger("script_map")
println("configuring graph ...")
conf=new BaseConfiguration()
conf.setProperty("storage.backend", "cassandra")
conf.setProperty("storage.keyspace", "titan")
conf.setProperty("storage.index.index-name", "titan")
conf.setProperty("storage.hostname", "localhost")
g=TitanFactory.open(conf)
}
def map(v, args) {
mylog.info("*****READ***** args: "+args[0].toString()+", "+args[1].toString())
//fetch all edges incident on Titan vertex corresponding to incoming Faunus vertex
gv=g.v(v.id)
edges=gv.bothE();null
//iterate through incident edges
while(edges.hasNext()) {
e=edges.next()
if (e.hasProperty("_namespace")) { //_namespace removed from previously processed edges
/*fetch terminal vertices of current edge, add incidence & adjacency props
to support metrics and analytics
*/
from=e.getVertex(OUT)
from.setProperty("inV_degree", from.in().count())
from.setProperty("inE_degree", from.inE().count())
from.setProperty("outV_degree" from.out().count())
from.setProperty("outE_degree", from.outE().count())
to=e.getVertex(IN)
to.setProperty("inV_degree", from.in().count())
to.setProperty("inE_degree", from.inE().count())
to.setProperty("outV_degree" from.out().count())
to.setProperty("outE_degree", from.outE().count())
mylog.info("*****READ*****edge id: "+e.id)
mylog.info("*****READ*****edge vertices: from id"+fromid+"; to id: "+to.id)
//fetch property keys of current edge
ekeys=e.getPropertyKeys()
//iterate through edge property keys
for(String ekey:ekeys)
eprop=e.getProperty(ekey) //get value of current property key
goodprop=!(eprop == "" || eprop == null)
mylog.info("*****READ*****edge key/value: "+ekey+"="eprop)
/*determine placement of current key/value on one or neither of the
terminal vertices based on key prefix arges and property value,
remove prefix from re-assigned key/value
*/
if(ekey.startsWith(args[0]) && goodprop) {
vkey=ekey.split(args[0])[1]
if(!from.hasProperty(vkey)) from.setProperty(vkey, eprop)
else {
vprop=from.getProperty(vkey)
if(!vprop.equal(eprop) from.setProperty(vkey, vprop+";"+eprop)
}
mylog.info("*****READ*****from vertex key/value: "+vkey+"="+from.getProperty(vkey)
}
else if(ekey.startsWith(args[1]) && goodprop) {
vkey=ekey.split(args[1])[1]
if(!to.hasProperty(vkey)) to.setProperty(vkey, eprop)
else {
vprop=to.getProperty(vkey)
if(!vprop.equal(eprop) to.setProperty(vkey, vprop+";"+eprop)
}
mylog.info("*****READ*****tovertex key/value: "+vkey+"="+to.getProperty(vkey)
}
//if current edge property key is re-assigned, remove it from the edge
if(ekey.startsWith(args[0]) || ekey.startsWith(args[1])) {
e.removeProperty(ekey)
if(e.hasProperty(ekey) println(ekey+" NOT remvoded from edge")
else println(ekey+ "removed from edge")
}
e.removeProperty("_namespace") // marks edge as processed per outer loop guard
}
}
}
g.commit()
}
def cleanup(args) {
g.shutdown()
}