背景:我有几个月使用 Gremlin 和 Faunus 的经验,包括。ScriptMap 步骤。
问题:当作为脚本的一部分加载到 shell 中时,用户定义的 Gremlin 步骤可以正常工作。但是,当在 Faunus ScriptMap 脚本中定义时,相同的步骤显然无效。
/***********Faunus Driver*************/
//usage gremlin -e <hhis file> NOTE: to run in gremlin remove .submit() at end of pipe
import Java.io.Console;
//get args
console = System.console()
mapperpath=console. readLine ('> <map script path>: ')
refns=console.readLine('> <reference namespace>: ')
refinterestkey-console.readLine('> <interest field>: ')
//currently not in use
refinterestval=console.readLine('> <interest value>: ')
mainpropkey=console.readLine('> ^main field>: ')
delim=console.readLine('> <main delimiter>: ')
args=[]
args [0]=refns
args [1]=refinterestkey
args[2]=refinterestval
args [3]=mainpropkey
args [4]=delim
args=(String[]) args.toArray()
f=FaunusFactory.open('propertyfile')
f.V().filter('{it.get Property("_namespace") =="streamernamespace" && it.getProperty("_entity")==" selector"}').script(mapperpath, args).submit()
f.shutdown()
/***********Script Mapper*************/
Gremlin.defineStep ("findMatch", [Vertex, Pipe],
{streamer, interestindicator, fieldofinterest, fun ->
_().has (interestindicator , true).has(fieldofinterest,
fun(streamer)
}
)
Gremlin.defineStep("connectMatch", [Vertex, Pipe], {streamer ->
// copy and link streaming vertices to matching vertices in main graph
_().transform({if(main!= null) {
mylog.info("reference vertex " + main.id
+" & streaming vertex"+streamer.id+" match on main " +main.getProperty(fieldofinterest));
clone=g.addVertex(null);
ElementHelper.copyProperties(streamer, clone);
clone.setProperty("_namespace", main.getProperty("__namespace"));
mylog.info("create clone "+clone.id+" in "+clone.getProperty("_namespace"));
g.addEdge(main, clone, streamer.getProperty("source");
mylog.info("created edge "+ e);
g.commit()
}})
})
def g
def refns
def refinterestkey
def refinterestval
def mainpropkey
def delim
def normValue
def setup(args) {
refns=args[0]
refinterestkey=args[1]
refinterestval=args[2]
mainpropkey=args[3]
delim=args[4]
normValue = {obj-> seltype=obj.getProperty("type");
seltypenorm=seltype.trim().toUpperCase();
desc=obj.getProperty("description");
if(desc.contains(delim}) (
selnum=desc.split(delim) [1].trim ()
} else selnum=desc.trim();
selnorm=seltypenorm.concat(delim).concat(selnum);
mylog.info ("streamer selector (" + seltype", "+desc+") normalized as "+selnorm);
return selnorm
}
mylog=java.util.logging.Logger.getLogger("script_map")
mylog.info ("configuring connection to reference graph
conf=new BaseConfiguration()
conf.setProperty("storage.backend", "cassandra"}
conf.setProperty!"storage.keyspace", "titan"}
conf.setProperty("storage.index.index-name", "titan")
conf.setProperty("storage.hostname", "localhost")
g=TitanFactory.open(conf)
isstepsloaded = Gremlin.getStepnames().contains("findMatch"} &&
Gremlin.getStepNames().contain("connectMatch"}
mylog.info("custom steps available?: "+isstepsloaded)
}
def map{v, args) {
try{
incoming=g.v(v.id)
mylog.info{"current streamer id: "+incoming.id)
if(incoming.getProperty("_entity")=="selector") {
mylog.info("process incoming vertex "+incoming.id)
g.V{"_namespace", refns).findMatch(incoming,refinterestkey, mainpropkey,normValue).connectMatch(incoming).iterate ()
}
}catch(Exception e) {
mylog.info("map method exception raised");
mylog.severe(e.getMessage()
}
g.commit()
}
def cleanup(args) { g.shutdown()}