我正在尝试实现 Trident+DRPC。我以一种不会无限期运行的方式设计了拓扑。我有两个单独的类,一个用于 spout 实现,另一个用于实现 DRPC 和 Trident。我的 spout 类(扩展 IRichSpout 的 spout)发出客户的 id。IE
public class TriSpout implements IRichSpout{
//some logic here
spoutOutputCollector.emit(new Values(id))
}
现在我从另一个使用 DRPC 实现 Trident 的类中的输出收集器中获取了值。
public class TriDrpc{
.....
TriSpout spout=new TriSpout1();
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1",spout)
.parallelismHint(1)
.each(new Fields("id"), new Compute(), new Fields("value"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"))
而drpc拓扑def如下
topology.newDRPCStream("Calc", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));
DRPC请求如下
public static void main(String[] args) throws Exception {
Config conf = new Config();
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Calculator", conf, buildTopology(drpc));
System.out.println("DRPC RESULT: "
+ drpc.execute("Calc", "id"));
Thread.sleep(1000);
} else {
conf.setNumWorkers(8);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
现在在上面的代码中,在 DRPC 请求中,即
System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));
应该与 spout 发出的"id"
id 相同,即我想知道哪个客户有使用此 id 的活动帐户,所以我需要为 spout 发出的所有 id 发送 DRPC 请求。现在 DRPC 在主类中,如何在不手动指定 id 的情况下将 spout 发出的值传递给 DRPC 请求?
有人可以帮忙吗
用新信息编辑