1

我想在我的 Java 程序中传递一条异步消息,所以第一步它应该持续监控数据库中某些表的变化。当有新的传入消息时,它应该显示它。只要应用程序正在运行,这应该是重复的过程。

我可以知道如何为以下代码进行此操作,其中包含轮询方法,该方法必须每 6 秒无限地调用自身,并且还应该在数据库中找到新的传入消息。

这是代码片段:

public class PollingSynchronizer implements Runnable {

private Collection<KPIMessage> incomingMessages;
private Connection dbConnection;


/**
 * Constructor. Requires to provide a reference to the KA message queue
 * 
 * @param incomingMessages reference to message queue
 * 
 */
   public PollingSynchronizer(Collection<KpiMessage> incomingMessages, Connection dbConnection) {
    super();
    this.incomingMessages = incomingMessages;
    this.dbConnection = dbConnection;
}

private int sequenceId;

public int getSequenceId() {
    return sequenceId;
}

public void setSequenceId(int sequenceId) {
    this.sequenceId = sequenceId;
}



@Override
/**
 * The method which runs Polling action and record the time at which it is done
 * 
 */
public void run() {
    try {


           incomingMessages.addAll(fullPoll());
            System.out.println("waiting 6 seconds");

            //perform this operation in a loop
            Thread.sleep(6000);

    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } 
    Date currentDate = new Date();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
//  System.out.println(sdf.format(currentDate) + " " + msg);
}

/**
 * Method which defines polling of the database and also count the number of Queries
 * @return 
 * @throws Exception
 */
public List<KpiMessage> fullPoll() throws Exception {

//  int sequenceID = 0;
    Statement st = dbConnection.createStatement();

    ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION = 804 order by SEQ DESC");
        List<KpiMessage> pojoCol = new ArrayList<KpiMessage>();
        while (rs.next()) {
            KpiMessage filedClass = convertRecordsetToPojo(rs);
            pojoCol.add(filedClass);
        }

        return pojoCol;
        }

/**
 * Converts a provided record-set to a {@link KpiMessage}.
 * 
 * The following attributes are copied from record-set to pojo:
 * 
 * <ul>
 * <li>SEQ</li>
 * <li>TABLENAME</li>
 * <li>ENTRYTIME</li>
 * <li>STATUS</li>
 * </ul>
 * 
 * @param rs
 *            the recordset to convert
 * @return the converted pojo class object
 * @throws SQLException
 *             if an sql error occurrs during processing of recordset
 */
private KpiMessage convertRecordsetToPojo(ResultSet rs) throws SQLException {

    KpiMessage msg = new KpiMessage();
    int sequence = rs.getInt("SEQ");
    msg.setSequence(sequence);
    int action = rs.getInt("ACTION");
    msg.setAction(action);
    String tablename = rs.getString("TABLENAME");
    msg.setTableName(tablename);
    Timestamp entrytime = rs.getTimestamp("ENTRYTIME");
    Date entryTime = new Date(entrytime.getTime());
    msg.setEntryTime(entryTime);
    Timestamp processingtime = rs.getTimestamp("PROCESSINGTIME");
    if (processingtime != null) {
        Date processingTime = new Date(processingtime.getTime());
        msg.setProcessingTime(processingTime);
    }
    String keyInfo1 = rs.getString("KEYINFO1");
    msg.setKeyInfo1(keyInfo1);
    String keyInfo2 = rs.getString("KEYINFO2");
    msg.setKeyInfo2(keyInfo2);
    return msg;
}
}

这里的序列 id 是表中的唯一 id,它随着新的传入消息的到达而不断增加。

PS:“恳请:请给出给出负分的理由(大拇指向下)。这样我就可以清楚地解释我的问题”

4

2 回答 2

1

简单地把它放在一个 while(true) 循环中。

public void run() {
    while(true){
        try {


               incomingMessages.addAll(fullPoll());
                System.out.println("waiting 6 seconds");

                //perform this operation in a loop
                Thread.sleep(6000);

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
        Date currentDate = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
    //  System.out.println(sdf.format(currentDate) + " " + msg);
   }
}

希望您已经将 Runnable 作为新线程启动。

对于新的和更新的消息,您需要一个字段,例如数据库中的“last_update”。每当您检查新消息时,您都需要更改 SQL 语句以获取新消息,例如:“ where last_update > $lastCheckedDate”,设置位置。lastCheckedDate

也许您还想阅读有关 Java 并发的内容:http: //docs.oracle.com/javase/tutorial/essential/concurrency/

于 2013-01-07T09:35:25.070 回答
0

放入while循环是一种方法,但我认为最好避免采用这种方法(有很多事情要搞砸,比如事务等)。

如果你真的需要做这种重复的事情,可以考虑使用调度器。Spring 3.x 确实内置了调度程序,或者您也可以使用 Quartz。

更好的方法是避免这样的轮询。是否可以在数据更新时将消息放入 JMS 队列中,以便一旦 JMS 队列中有此类消息时将调用您的逻辑(通过消息驱动 bean)?(只是一种可能的方法,有很多类似的方法可以做)

于 2013-01-07T09:44:15.473 回答