1

I am having two separate topologies in my main() method. I am using 'for loop' to set its corresponding spout and bolts and submit that topology aswell. each topology is having separate set of spout and bolts with different logic in them.

Now I want to call different spouts and bolts dynamically. How can I do that ??

code in main method:

    public static void main(String[] args) throws InterruptedException,Exception            
    {
    LocalCluster cluster = new LocalCluster();



    try{
        BufferedReader in=new BufferedReader(new FileReader("/home/praveen /workspace/OfferEngine/OfferListJSON.json"));
        //ArrayList<String> content = new ArrayList<String>();
        String str="";
        String str1="";
        while((str1=in.readLine())!=null)
        {
            str = str + str1;
        }

         JsonParser parser = new JsonParser();
         JsonObject o = (JsonObject)parser.parse(str);
         JsonObject o2 = (JsonObject)o.get("OfferData");

         ArrayList<String> ol = new ArrayList<String>();
         ol.add(o2.get("strOfferId").toString().replaceAll("^\"|\"$", ""));
         ol.add(o2.get("strOfferId1").toString().replaceAll("^\"|\"$", ""));

         TopologyBuilder bu = null;
         Config config = new Config();
         config.setDebug(false);

         for(String x : ol) {
                System.out.println(x); //prints element x
                String y="", z="";
                bu = new TopologyBuilder();         
                bu.setSpout(x, new Off1Spout(x).ks(), 2);
                y = x+"2";
                bu.setBolt(y, new main.java.bolts.Off1Bolt()).shuffleGrouping(x);
                z = x+"offerlimit";
                bu.setBolt(z, new OfferLimit()).shuffleGrouping(y);
cluster.submitTopology(x,config,bu.createTopology());
              }
    }

    catch (IOException e) 
    {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

Any kind of suggestion is more helpfull... thanks regards

4

1 回答 1

1

您可以在 for 循环中尝试 if() 条件以获取不同的优惠...请参阅以下代码以供参考

for(String x : ol) {
     System.out.println(x); 
//  Offer 1 related Topology
    if(x.equalsIgnoreCase("offer1")){                   
       String y="", z="";
       bu = new TopologyBuilder();          
       bu.setSpout();
       y = x+"start";
       cluster.submitTopology(x,config,bu.createTopology());
    }

//      Offer 2 related Topology

 if(x.equalsIgnoreCase("off2")){

       String y="", z="";
       bu = new TopologyBuilder();          
       bu.setSpout(x, new Off1Spout(x).ks(), 2);
       y = x+"start";
       bu.setBolt(y, new off2Bolt()).shuffleGrouping(x);
       cluster.submitTopology(x,config,bu.createTopology());
  }
}

有多少拓扑创建了那么多 if 条件..

于 2013-10-03T06:06:49.853 回答