6

我正在处理的应用程序是一个基于 Java 的 ETL 过程,它将数据加载到多个表中。DBMS 是 Infobright(一个面向数据仓库的基于 MYSQL 的 DBMS)。

数据加载应该以原子方式完成;但是,出于性能原因,我想同时将数据加载到多个表中(使用LOAD DATA INFILE命令)。这意味着我需要打开多个连接。

是否有任何解决方案可以让我以原子方式并行执行负载?(我猜答案可能取决于我加载到的表的引擎;其中大多数是 Brighthouse,它允许事务,但没有 XA 和保存点)。

为了进一步澄清,我想避免一种情况,让我们说:

  • 我将数据加载到 5 个表中
  • 我为前 4 个表提交负载
  • 第 5 个表的提交失败

在这种情况下,我无法回滚前 4 次加载,因为它们已经提交。

4

2 回答 2

5

介绍

正如我所承诺的那样,我已经编写了一个完整的示例。我使用 MySQL 并创建了三个表,如下所示:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2最初包含一行。

INSERT INTO `test2` (`data`) VALUES ('a');

我已将完整代码发布到 http://pastebin.com。)

下面的例子做了几件事。

  1. 设置threads3which 确定要并行运行的作业数量。
  2. 创建threads连接数。
  3. 为每个表输出一些示例数据(默认情况下,数据是a针对每个表的)。
  4. 创建threads要运行的作业数量并用数据加载它们。
  5. 以线程threads数运行作业并等待它们完成(成功与否)。
  6. 如果没有发生异常,则提交每个连接;否则它会回滚它们中的每一个。
  7. 关闭连接(但是这些可以重复使用)。

(请注意,我在 中使用了 Java 7 的自动资源管理功能SQLTask.call()。)

逻辑

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

数据

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

任务

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

结果

给定返回的默认数据getTableData(...)

test1 -> `a`
test2 -> `a`
test3 -> `a`

并且test2已经包含a(并且该data列是唯一的)第二个作业的事实将失败并引发异常,因此每个连接都将回滚。

如果a您返回bs 而不是 s,则连接将被安全提交。

这可以通过类似的方式完成LOAD DATA


在 OP 对我的回答做出回应后,我意识到她/他想要做的事情不可能以简单明了的方式完成。

基本上问题是,在成功提交后,提交的数据不能回滚,因为操作是原子的。鉴于在给定的情况下需要多次提交,除非跟踪所有数据(在所有事务中)并且如果发生某些事情会删除成功提交的所有内容,否则不可能回滚所有内容。

关于提交和回滚的问题有一个很好的答案。

于 2011-12-07T17:08:47.403 回答
0

实际上,在较新版本的 IEE(而不是 ICE)中,有一个称为 DLP(分布式负载处理)的附加功能。网站上有一个 PDF 文件,从这里链接:

http://www.infobright.com/Products/Features/

于 2012-02-16T20:39:05.273 回答