2

现在基本上我已经创建了三个类。

public void run() {  
int seqId = 0;  
while(true) {  
    List<KamMessage> list = null;  
    try {  
        list = fullPoll(seqId);  
    } catch (Exception e1) {  
        e1.printStackTrace();  
    }  
    if (!list.isEmpty()) {  
        seqId = list.get(0).getSequence();  
        incomingMessages.addAll(list);  
        System.out.println("waiting 3 seconds");  
        System.out.println("new incoming message");  
    }  
    try {  
        Thread.sleep(3000);  
        System.out.println("new incoming message");  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }   
   }  
 }  
 public List<KamMessage> fullPoll(int lastSeq) throws Exception {  
 Statement st = dbConnection.createStatement();  
 ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ >" +  
 lastSeq + "order by SEQ DESC");        
 List<KamMessage> pojoCol = new ArrayList<KamMessage>();  
  while (rs.next()) {  
    KamMessage filedClass = convertRecordsetToPojo(rs);  
    pojoCol.add(filedClass);  
  }  
for (KamMessage pojoClass : pojoCol) {  
    System.out.print(" " + pojoClass.getSequence());  
    System.out.print(" " + pojoClass.getTableName());  
    System.out.print(" " + pojoClass.getAction());  
    System.out.print(" " + pojoClass.getKeyInfo1());  
    System.out.print(" " + pojoClass.getKeyInfo2());  
    System.out.println(" " + pojoClass.getEntryTime());  
   }             
return pojoCol;  
  }   

以下是这些类: 1.Poller- 执行轮询并将新数据从 db 传递到控制器

2.Controller——这个类有一个线程Pool,它同时调用Poller并有新的数据要从处理器请求

3.Processor——这个类必须寻找新数据,处理它并将其返回给控制器。

所以现在我的问题是如何实施第三阶段......

这是我的控制器类:

public class RunnableController {  

/** Here This Queue initializes the DB and have the collection of incoming message
 *                    
 */  
  private static Collection<KpiMessage> incomingQueue = new ArrayList<KpiMessage>();  
  private Connection dbConncetion;  
  public ExecutorService threadExecutor;  
  private void initializeDb()  
  {  
    //catching exception must be adapted - generic type Exception prohibited  
    DBhandler conn = new DBhandler();  
    try {  
        dbConncetion = conn.initializeDB();  
    } catch (Exception e) {  
        // TODO Auto-generated catch block  
        e.printStackTrace();  
     }  
  }  


private void initialiseThreads()  
{         
    try {  

        threadExecutor = Executors.newFixedThreadPool(10);  
            PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion);  
        threadExecutor.submit(read);  

    }catch (Exception e){  
    e.printStackTrace();  
    }  

}  

@SuppressWarnings("unused")  
private void shutDownThreads()  
{         
    try {  
        threadExecutor.shutdown();  
        //DB handling should be moved to separate DB class  
        dbConncetion.close();  

    }catch (Exception e){  
    e.printStackTrace();  
    }  

}  

/** Here This Queue passes the messages and have the collection of outgoing message 
 *  
 */  

//private Collection<KpiMessage> outgingQueue = new ArrayList<KpiMessage>();  
//have to implement something here for future  

     public static void main(String[] args) throws InterruptedException {  
     RunnableController controller = new RunnableController();  

    System.out.println(incomingQueue.size());  

    controller.initializeDb();  
    controller.initialiseThreads();  

    Thread.sleep(3000);  
    System.out.println("Polling");  

  }  

} 
4

1 回答 1

4

我建议使用 BlockingQueue,而不是简单的 ArrayList。只需更改incomingQueue变量的类型。然后你可以让另一个线程(或线程池)做类似的事情

//pseudocode
while (true) {
   // it polls data from the incomingQueue that shares with the producers
    KpiMessage message = this.incomingQueue.take()

   //Then process the message and produces an output... you can put that output in a different queue as well for other part of the code to pick it up
}

BlockingQueues 的一个很好的例子可以在这里找到http://www.javamex.com/tutorials/blockingqueue_example.shtml

于 2013-01-10T15:48:42.347 回答