2

我正在尝试在这里做多线程,现在我必须使用 DbHandler 类更新我的数据库

程序执行从一个控制器类开始,它有一个 main 方法和一个线程池:

public class RunnableController {
// Main method
public static void main(String[] args) throws InterruptedException {
    try {
        RunnableController controller = new RunnableController();
        controller.initializeDb();
        controller.initialiseThreads();
        System.out.println("Polling");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

   private void initialUpdate()
{
    DBhandler dbhandler = new DBhandler();
    dbhandler.updateDb(getOutgoingQueue());
}

private void initialiseThreads() {      
    try {
        threadExecutorRead = Executors.newFixedThreadPool(10);
        PollingSynchronizer read = new PollingSynchronizer(incomingQueue, dbConncetion);
        threadExecutorRead.submit(read);
    } catch (Exception e){
        e.printStackTrace();
    }   
 }
}

我的 poller 类获取新数据并应该模拟更新:

 public class PollingSynchronizer implements Runnable {
     public PollingSynchronizer(Collection<KamMessage> incomingQueue,
     Connection dbConnection) {
     super();
    this.incomingQueue = incomingQueue;
    this.dbConnection = dbConnection;
 }

  private int seqId;

   public int getSeqId() {
    return seqId;
  }

   public void setSeqId(int seqId) {
   this.seqId = seqId;
 }

  // The method which runs Polling action and record the time at which it is done
    public void run() {
     int seqId = 0;

      while (true) {
        List<KamMessage> list = null;

        try {
           list = fullPoll(seqId);

           if (!list.isEmpty()) {
             seqId = list.get(0).getSequence();
             incomingQueue.addAll(list);
             this.outgoingQueue = incomingQueue;
             System.out.println("waiting 3 seconds");
             System.out.println("new incoming message");
             Thread.sleep(3000);//at this wait I should execute run()

             //when I debug my execution stops here and throws " Class not found Exception "
             // its does not enters the message processor class 
             MessageProcessor processor = new MessageProcessor() {
              //the run method which should fetch the message processor class.
              final public void run() {
               RunnableController.setOutgoingQueue(generate(outgoingQueue));
              }
           };
          new Thread(processor).start();
        }
     } catch (Exception e1) {
        e1.printStackTrace();
     }
  }
 }
}

我的消息处理器类:

 public class MessageProcessor implements Runnable {
private Collection<KpiMessage> fetchedMessages;
private Connection dbConnection;
Statement st = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
private Collection<KamMessage> outgoingQueue;

public Collection<KamMessage> MessageProcessor(Collection<KamMessage> outgoingQueue){
    this.outgoingQueue = outgoingQueue;
    this.dbConnection = dbConnection;
    return outgoingQueue;
}
/**
 * Method for updating new values into database in preference for dummy processing of message
 * @param outgoingQueue 
 * @return 
 */
@SuppressWarnings("javadoc")
public Collection<KamMessage> generate(Collection<KamMessage> outgoingQueue)
{
        for (KamMessage pojoClass : outgoingQueue) {
            KamMessage updatedValue = createKamMsg804(pojoClass);
            System.out.print(" " + pojoClass.getSequence());
            System.out.print(" " + pojoClass.getTableName());
            System.out.print(" " + pojoClass.getAction());
            System.out.print(" " + updatedValue.getKeyInfo1());
            System.out.print(" " + updatedValue.getKeyInfo2());
            System.out.println(" " + pojoClass.getEntryTime());
        }
        return outgoingQueue;
}

/**
 * 
 * @param pojoClass 
 * @return msg
 */
public KamMessage createKamMsg804(KamMessage pojoClass)
{
    if(pojoClass.getAction() == 804){
    pojoClass.setKeyInfo1("ENTITYKEY9");
    pojoClass.setKeyInfo2("STATUSKEY9");
    }
    return pojoClass;
}
private KamMessage convertRecordsetToPojo(ResultSet rs) throws SQLException {

    KamMessage msg = new KamMessage();
    int sequence = rs.getInt("SEQ");
    msg.setSequence(sequence);
    String tablename = rs.getString("TABLENAME");
    msg.setTableName(tablename);
    Timestamp entrytime = rs.getTimestamp("ENTRYTIME");
    Date entryTime = new Date(entrytime.getTime());
    msg.setEntryTime(entryTime);
    Timestamp processingtime=rs.getTimestamp("PROCESSINGTIME");
    if(processingtime!=null){
        Date processingTime = new Date(processingtime.getTime());
        msg.setProcessingTime(processingTime);   
    }
    String keyInfo1 = rs.getString("KEYINFO1");
    msg.setKeyInfo1(keyInfo1);
    String keyInfo2 = rs.getString("KEYINFO2");
    msg.setKeyInfo2(keyInfo2);
    return msg;
}


@Override
 public void run() {

    // TODO Auto-generated method stub

 }

  }

这是我的 DBhandler 类,它应该在数据库中进行更新

         public class DBhandler {
  Connection conn = null;
Statement st = null;
ResultSet rs = null;
PreparedStatement pstmt = null;

public DBhandler(){
    super();
}

/**
 * Method to initialize the database connection
 * @return conn
 * @throws Exception 
 * 
 */
public Connection initializeDB() throws Exception {
    System.out.println("JDBC connection");
    DriverManager.registerDriver(new oracle.jdbc.driver.OracleDriver());
    conn = DriverManager.getConnection("jdbc:oracle:thin:@VM-SALES-
           MB:1521:SALESDB1","bdeuser", "edb"); // Connection for Database SALES-DB1   
    return conn;
}

 //The method for updating Database

  public void updateDb(Collection<KpiMessage> updatedQueue){
    for(KpiMessage pojoClass : updatedQueue){
       //**How the query should be used so that it gets last sequence vale and Updates into   
            Database**
           String query = "UPDATE msg_new_to_bde Set KEYINFO1= ?, KEYINFO2 = ? WHERE SEQ =  and 
           action = 804";   
      }
}
/**
 * Method for Closing the connection
 * @throws Exception 
 *
 */

     public void closeDB() throws Exception { 
    st.close();
    conn.close();
   }

    }

我只需要通过调用控制器类中的 updatedQueue 来使用此类(DbHAndler)中的更新查询来更新数据库。

我的程序流程 - 我有三个类: 1.Controller 2.PollerSynchro 3.Msgprocessor

我有数据库记录,这些记录被转换为 POJO 形式并存储在 Collection 中。使用这些 POJO,我的课程尝试一次性进行多处理和更新。

控制器 - 拥有线程池,使用 poll 方法启动 poller 类 - 完成

Poller - 应该轮询新的传入消息并将其存储在传入队列中 - 完成

MsgProcessor - 应该寻找新的传入消息并将它们从传出队列传递到传入队列 - 也完成

DbHandler- 应该在数据库中更新。

问题:

现在我的问题是

我必须在轮询线程休眠 3 秒时执行此更新 - 完成

在我的 Poller 类中的第二个 void run() 方法的代码中,传出队列没有传递给消息处理器类进行更新。我的执行流程只是循环回到第一个运行方法,并且得到 Class exception-Resolved

如何在 Dbhanler 类的数据库中更新它

请帮我解决这些问题。

4

1 回答 1

4

异常似乎来自这一行(这是 MessageProcessor.java 第 38 行吗?)

return (KpiMsg804) fetchedMessages;

fetchedMessages这一点上似乎是一个ArrayList

于 2013-01-22T09:56:27.593 回答