1

我正在尝试实现 Trident+DRPC。我以一种不会无限期运行的方式设计了拓扑。我有两个单独的类,一个用于 spout 实现,另一个用于实现 DRPC 和 Trident。我的 spout 类(扩展 IRichSpout 的 spout)发出客户的 id。IE

public class TriSpout implements IRichSpout{
  //some logic here
    spoutOutputCollector.emit(new Values(id))
  }

现在我从另一个使用 DRPC 实现 Trident 的类中的输出收集器中获取了值。

public class TriDrpc{

    .....
    TriSpout spout=new TriSpout1();        
    TridentTopology topology = new TridentTopology();  
    TridentState wordCounts =
          topology.newStream("spout1",spout)
            .parallelismHint(1)
            .each(new Fields("id"), new Compute(), new Fields("value"))
            .persistentAggregate(new MemoryMapState.Factory(),
                                 new Count(), new Fields("count"))   

而drpc拓扑def如下

topology.newDRPCStream("Calc", drpc)
         .each(new Fields("args"), new Split(), new Fields("word"))                
         .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));         

DRPC请求如下

public static void main(String[] args) throws Exception {
    Config conf = new Config(); 

    if (args.length == 0) {
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Calculator", conf,   buildTopology(drpc));          
    System.out.println("DRPC RESULT: "
                + drpc.execute("Calc", "id"));
    Thread.sleep(1000);

    } else {
        conf.setNumWorkers(8);
        StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
}

现在在上面的代码中,在 DRPC 请求中,即

System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));

应该与 spout 发出的"id"id 相同,即我想知道哪个客户有使用此 id 的活动帐户,所以我需要为 spout 发出的所有 id 发送 DRPC 请求。现在 DRPC 在主类中,如何在不手动指定 id 的情况下将 spout 发出的值传递给 DRPC 请求?

有人可以帮忙吗

用新信息编辑

4

1 回答 1

3

更新

嗯,现在更清楚你的问题是什么了,谢谢。

因此,您需要处理相同 DRPC 的拓扑 spout 发出的相同 ID 的 DRPC 请求。

实现此目的的唯一方法是将您从 spout 发出的 ID 持久化到 Storm 外部的持久存储(例如,RDMS 或分布式哈希图)。

这样,在您提交拓扑以在 Storm 集群上执行后,您可以轮询持久存储以获取新 ID,并为每个新 ID 执行 DRPC 请求。

原始答案

我不认为我理解这个问题。您是否尝试使用从同一个 DRPC 拓扑的 spout 输出中获取的请求 ID 参数来执行 Storm DRPC 请求?我认为这不是 DRPC 拓扑的有效和有意使用。您最好使用普通拓扑。

DRPC 拓扑用于有限计算,而普通拓扑用于连续计算。DRPC 调用采用 DRPC 拓扑的名称,以及一组用于计算 DRPC 调用结果的输入参数。普通的 Storm(或 Trident)拓扑只是无限期地运行,计算某种结果并将其持久化。

我希望这有帮助。如果不是,请更好地重新表述您的问题,因为目前还不清楚您的问题是什么。

于 2013-09-20T08:43:38.100 回答