0

传入的数据必须反馈到轮询方法中,并且还应该为另一个类进一步处理。

public List<KAAMessage> fullPoll() throws Exception {

    Statement st = dbConnection.createStatement();      
    ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION = 804 order by SEQ DESC");    
    List<KpiMessage> pojoCol = new ArrayList<KpiMessage>();
    while (rs.next()) {
        KpiMessage filedClass = convertRecordsetToPojo(rs);
        pojoCol.add(filedClass);
    }
        return pojoCol;
}


/**
 * Converts a provided record-set to a {@link KpiMessage}.
 * 
 * The following attributes are copied from record-set to pojo:
 * 
 * <ul>
 * <li>SEQ</li>
 * <li>TABLENAME</li>
 * <li>ENTRYTIME</li>
 * <li>STATUS</li>
 * </ul>
 * 
 * @param rs
 *            the record-set to convert
 * @return the converted pojo class object
 * @throws SQLException
 *             if an sql error occurs during processing of recordset
 */
private KpiMessage convertRecordsetToPojo(ResultSet rs) throws SQLException {

    KpiMessage msg = new KpiMessage();
    int sequence = rs.getInt("SEQ");
    msg.setSequence(sequence);
    int action = rs.getInt("ACTION");
    msg.setAction(action);
    String tablename = rs.getString("TABLENAME");
    msg.setTableName(tablename);
    Timestamp entrytime = rs.getTimestamp("ENTRYTIME");
    Date entryTime = new Date(entrytime.getTime());
    msg.setEntryTime(entryTime);
    Timestamp processingtime = rs.getTimestamp("PROCESSINGTIME");
    if (processingtime != null) {
        Date processingTime = new Date(processingtime.getTime());
        msg.setProcessingTime(processingTime);
    }
    String keyInfo1 = rs.getString("KEYINFO1");
    msg.setKeyInfo1(keyInfo1);
    String keyInfo2 = rs.getString("KEYINFO2");
    msg.setKeyInfo2(keyInfo2);
    return msg;
}

此类具有从数据库读取消息的 Poll() 方法。在数据库中,我将 SequeceID 作为唯一编号,并且它会不断增加新数据,但是如何获取这些新消息并将其反馈给轮询?

PS:如果您拒绝投票或关闭我的帖子,请发表评论,因为我是新来的,我想知道详细信息。

这是为 Poll() 方法运行线程的控制器类。

public class RunnableController {

/** Here This Queue initializes the DB and have the collection of incoming message
 * 
 */
private static Collection<KaMessage> incomingQueue = new ArrayList<KAMessage>();
private Connection dbConncetion;
private ExecutorService threadExecutor;


private void initializeDb()
{
    //catching exception must be adapted - generic type Exception prohibited
    DBhandler con = new DBhandler();
    try {
        dbConncetion = con.initializeDB();
    } catch (Exception e) {
        e.printStackTrace();
    }
}


private void initialiseThreads()
{       
    try {

        threadExecutor = Executors.newFixedThreadPool(10);
             PollingSynchronizer read = new PollingSynchronizer(incomingQueue, 
dbConncetion);
        threadExecutor.submit(read);

    }catch (Exception e){
    e.printStackTrace();
    }

}

private void shutDownThreads()
{       
    try {
        threadExecutor.shutdown();
        dbConncetion.close();

    }catch (Exception e){
    e.printStackTrace();
    }

}

/** Here This Queue passes the messages and have the collection of outgoing message
 * 
 */

  //    private Collection<KpiMessage> outgingQueue = new ArrayList<KpiMessage>();


/**
 * Main
 * 
 * @param args
 * @throws InterruptedException 
 */
public static void main(String[] args) throws InterruptedException {
    RunnableController controller = new RunnableController();

    System.out.println(incomingQueue.size());

    controller.initializeDb();
    controller.initialiseThreads();
            System.out.println("Repetetive polling for each 6 seconds");
        KpiProcessor kp = new KpiProcessor();
    try {
    } catch (Exception e) {
        e.printStackTrace();
    }
    }

     }
4

0 回答 0