1

我在这里有一个相当简单的 Java 类,它创建了 2 个线程池......

  • 连接到正在运行的 URL 流并逐行读取条目,将每个条目提交到后端 MySQL 数据库。

  • 产生几个线程,每个线程将执行相同的过程(下)

1.从上面获取最旧的数据库条目

2.解析并相应处理

3.将几个部分保存到另一个数据库表

4.从运行表中删除这个DB条目,表示分析完成

5.结束线程

我需要 2 个池的原因是因为读取过程比分析快得多,并且如果我读取和分析每个条目,因为它通过条目备份太快并且传入流中断。通过进行这种分离,读取可以尽可能快地发生,并且分析可以尽可能快地进行,因为它知道要赶上的记录是安全的并且可以赶上。

我遇到的问题是每个并发线程都获得相同的最旧记录。我需要知道确保单独线程全部同时运行但每个线程访问唯一的最旧数据库条目的最佳方法是什么。

提前致谢。

编辑==================================

谢谢大家到目前为止的回复...

为了进一步扩展我在这里尝试的当前设置,也许这​​个代码段会有所帮助......

try
    {
        String strQuery1 = "SELECT lineID,line FROM lineProcessing ORDER BY lineID ASC LIMIT 1;";
        String strQuery2 = "DELETE from lineProcessing WHERE lineID = ?";

        DBConnector dbc = new DBConnector(driver,url,userName,passwd); 
        Connection con = dbc.getConnection();
        con.setAutoCommit(false);
        PreparedStatement pstmt = con.prepareStatement(strQuery1);
        rs = pstmt.executeQuery();
        
        //Now extract the line & Id from the returned result set
        while (rs.next()) {
            lineID = Integer.parseInt(rs.getString(1));
            line = rs.getString(2);
        } //end while 
        
        //Now delete that entry so that it cannot be analysed again...
        pstmt = con.prepareStatement(strQuery2);
        pstmt.setString(1, lineID.toString());
        int res=pstmt.executeUpdate();
        
        con.commit();
        con.setAutoCommit(true);
        con.close();
    }
    catch (SQLException e) {
        System.out.println(">>>EXCEPTION FOUND IN QUERY = " + strQuery1 + " __or__ " + strQuery2);
        e.printStackTrace();
    }

...所以你可以看到基本上打开一个数据库连接,将其设置为“Autocommit = false”,执行 QUERY1,执行 QUERY2,提交两个事务最终关闭连接。这应该是每个单独的线程都需要完成的。问题是我在分析线程池中运行的每个 X 线程都被生成并且都同时执行这批代码(这是我所期望的),但不尊重我认为我已经设置的对 DB 的单一连接访问以上。然后它们都返回同一行进行分析。当线程下一次循环进行迭代 #2 时,它们都将返回这个新的最后一行以在前一次删除之后进行分析。

请有任何进一步的建议 - 包括通过 java 强制事务 SQL 的一个很好的例子?

再次感谢各位。

4

2 回答 2

1

首先,添加一个可以为空的日期时间列,表示该行已在某个时间“拾取”。

然后在您的处理线程中:

  1. 开始交易
  2. 查找“拾取”时间为空的最旧行
  3. 将拾取时间更新为当前系统时间
  4. 提交事务。

确保您的隔离级别设置为至少READ UNCOMMITTED,并且没有两个线程应该获得同一行。此外,如果处理线程死亡并放弃它的行,您可以通过定期查询“拾取”时间早于某个值的行来找出这一点,并通过将拾取时间设置为空来重新处理这些行。

或者只是切换到事务性消息队列,它会自动为您完成大部分工作。

于 2012-06-25T15:42:11.187 回答
0

另一种解决方案是让工作线程都等待包含行键的单例。写入行,将键放在对象中,然后通知。“下一个”工作线程将拾取密钥并对其进行操作。您将需要确保有工人在等待,什么不是。

于 2012-06-25T16:53:20.197 回答