0

在这里,我有两个应该相互同步的运行方法。

轮询类:

     */
public void run() {
    int seqId = 0;
    while(true) {
    List<KpiMessage> list = null;

        try{
            if(!accumulator.isUsed){                
                try {
                    list = fullPoll(seqId);

                    if (!list.isEmpty()) {
                        seqId = list.get(0).getSequence();
                        accumulator.manageIngoing(list);
                    }
                    System.out.println("Updated");                      
                    wait(); 
                } catch (Exception e1) {
                    e1.printStackTrace();

                }
            }

        } catch (Exception e){
            // TODO:
            System.err.println(e.getMessage());
            e.printStackTrace();                
        }
    }

}


/**
 * Method which defines polling of the database and also count the number of Queries
 * @param lastSeq 
 * @return pojo col
 * @throws Exception
 */
public List<KpiMessage> fullPoll(int lastSeq) throws Exception {
    Statement st = dbConnection.createStatement();
    System.out.println("Polling");
    ResultSet rs = st.executeQuery("Select * from msg_new_to_bde where ACTION = 814 and 
    STATUS = 200 order by SEQ DESC");
    List<KpiMessage> pojoCol = new ArrayList<KpiMessage>();
    try {


        while (rs.next()) {
            KpiMessage filedClass = convertRecordsetToPojo(rs);
            pojoCol.add(filedClass);
        }

        for (KpiMessage pojoClass : pojoCol) {
            System.out.print(" " + pojoClass.getSequence());
            System.out.print(" " + pojoClass.getTableName());
            System.out.print(" " + pojoClass.getAction());
            System.out.print(" " + pojoClass.getKeyInfo1());
            System.out.print(" " + pojoClass.getKeyInfo2());
            System.out.print(" "+ pojoClass.getStatus());
            System.out.println(" " + pojoClass.getEntryTime());

        }


    } finally  {
        try {
            st.close();
            rs.close();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }       

处理和更新类:

        public void run() {
    while(true){
        try {
            while(!accumulator.isUsed)
            {
                try {
                System.out.println("Waiting for new outgoingmessages"); 
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
     Collection<KpiMessage> outgoingQueue = generate(accumulator.outgoingQueue); 
            accumulator.manageOutgoing(outgoingQueue, dbConnection);
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }
}
}   

我有一个逻辑错误:

轮询器不仅轮询新消息,而且从第一个开始一次又一次地读取数据库。

也是一次又一次的更新。

如何解决这个同步问题

4

2 回答 2

1

您应该为您正在调用或在其上的对象同步或更确切地说持有锁或监视器wait()notify()

以下是对您有帮助的:wait() throwing IllegalArgumentException

synchronized(lockObject){

     lockObject.wait(); //you should hold the lock to be able to call wait()
}
于 2013-02-19T09:01:07.067 回答
1

或者,您可以使用 aBlockingQueue在线程之间传输数据。

有关详细信息,请参阅阻塞队列

// The end of the list.
private static final Integer End = -1;

static class Producer implements Runnable {
  final Queue<Integer> queue;
  private int i = 0;

  public Producer(Queue<Integer> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      for (int i = 0; i < 1000; i++) {
        queue.add(i++);
        Thread.sleep(1);
      }
      // Finish the queue.
      queue.add(End);
    } catch (InterruptedException ex) {
      // Just exit.
    }
  }
}

static class Consumer implements Runnable {
  final Queue<Integer> queue;
  private int i = 0;

  public Consumer(Queue<Integer> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    boolean ended = false;
    while (!ended) {
      Integer i = queue.poll();
      if ( i != null ) {
        ended = i == End;
        System.out.println(i);
      }
    }
  }
}

public void test() throws InterruptedException {
  Queue queue = new LinkedBlockingQueue();
  Producer p = new Producer(queue);
  Consumer c = new Consumer(queue);
  Thread pt = new Thread(p);
  Thread ct = new Thread(c);
  // Start it all going.
  pt.start();
  ct.start();
  // Close it down
  pt.join();
  ct.join();
}
于 2013-02-19T09:11:38.180 回答