2

在这里,该方法读取具有唯一 ID 且序列号不断增加的数据库,因为我是 java 初学者,我能知道如何实现这种重复轮询并每次检查新的传入消息。

public void run() {
int seqId = 0;
    while(true) {
        List<KpiMessage> list = null;
        try {
            list = fullPoll(seqId);
            if (!list.isEmpty()) {
                seqId = list.get(0).getSequence();
                incomingMessages.addAll(list);
                System.out.println("waiting 3 seconds");
                System.out.println("new incoming message");
                Thread.sleep(3000);

            }
        } catch (Exception e1) {
            e1.printStackTrace();
        }


    }
}

//Method which defines polling of the database and also count the number of Queries
public List<KpiMessage> fullPoll(int lastSeq) throws Exception {
    Statement st = dbConnection.createStatement();
    ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ >" + lastSeq + "order by SEQ DESC");     
    List<KpiMessage> pojoCol = new ArrayList<KpiMessage>();
    while (rs.next()) {
        KpiMessage filedClass = convertRecordsetToPojo(rs);
        pojoCol.add(filedClass);
    }
    for (KpiMessage pojoClass : pojoCol) {
        System.out.print(" " + pojoClass.getSequence());
        System.out.print(" " + pojoClass.getTableName());
        System.out.print(" " + pojoClass.getAction());
        System.out.print(" " + pojoClass.getKeyInfo1());
        System.out.print(" " + pojoClass.getKeyInfo2());
        System.out.println(" " + pojoClass.getEntryTime());
    }
    //          return seqId;           
    return pojoCol;
}  

我的目标是从数据库中轮询表并检查是否有新的传入消息,我可以从表中的 Header 字段 SequenceID 中找到该消息,该字段是唯一的,并且会不断增加新条目。现在我的问题是

1.假设我第一次轮询后,它会读取所有条目并使线程休眠 6 秒,同时如何获取新的传入数据并再次轮询?

2.还有如何添加新数据,当它第二次轮询并将新数据传递给另一个类时。

4

2 回答 2

2

Poller 每 6 秒调用一次 fullPoll 并将 lastSeq 参数传递给它。最初 lastSeq = 0。当 Poller 获取结果列表时,它将 lastSeq 替换为最大 SEQ 值。fullPoll 只检索 SEQ > lastSeq 的记录。

void run() throws Exception {
    int seqId = 0;
    while(true) {
          List<KpiMessage> list = fullPoll(seqId);
          if (!list.isEmpty()) {
            seqId = list.get(0).getSequene();
          }
          Thread.sleep(6000); 
    }
}

public List<KAMessage> fullPoll(int lastSeq) throws Exception {
       ...
       ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION =  804 and SEQ > " + lastSeq + " order by SEQ 
      DESC");     
      .. 
}
于 2013-01-09T15:46:21.117 回答
1

这是一些您可以用来开始工作的代码。我尝试使用观察者模式使其非常灵活;这样您就可以将多个“消息处理器”连接到同一个轮询器:

public class MessageRetriever implements Runnable {
    private int lastID;
    private List<MessageListener> listeners;

   ...

    public void addMessageListener(MessageListener listener) {
        this.listeners.add(listener)
    }

    public void removeMessageListener(MessageListener listener) {
        this.listeners.remove(listener)
    }

    public void run() {
        //code to repeat the polling process given some time interval    
    }  

    private void pollMessages() {
        if (this.lastID == 0)
            this.fullPoll()
        else
            this.partialPoll()           
    }

    private void fullPoll() {
        //your full poll code

        //assuming they are ordered by ID and it haves the ID field - you should
        //replace this code according to your structure
        this.lastID = pojoCol.get(pojoCol.length() - 1).getID() 

        this.fireInitialMessagesSignal(pojoCol)
    }

    private void fireInitialMessagesSignal(List<KAMessage> messages) {
        for (MessageListener listener : this.listeners)
            listener.initialMessages(messages)
    }

    private void partialPoll() {
        //code to retrieve messages *past* the lastID. You could do this by 
        //adding an extra condition on your where sentence e.g 
        //select * from msg_new_to_bde where ACTION = 804 AND SEQ > lastID order by SEQ DESC
        //the same as before
        this.lastID = pojoCol.get(pojoCol.length() - 1).getID() 

        this.fireNewMessagesListener(pojoCol)
    }

    private void fireNewMessagesListener(List<KAMessage> messages) {
        for (MessageListener listener : this.listeners)
            listener.newMessages(messages)
    }

}

还有界面

public interface MessageListener {
    public void initialMessages(List<KAMessage> messages);
    public void newMessages(List<KAMessage> messages)
}

基本上,使用这种方法,检索器是一个可运行的(可以在它自己的线程上执行)并负责整个过程:进行初始轮询并在给定的时间间隔内继续进行“部分”轮询。

不同的事件触发不同的信号,将受影响的消息发送给注册的侦听器,然后这些侦听器根据需要处理消息。

于 2013-01-09T15:59:36.413 回答