3

有没有办法取消通过copyIn()在单独的线程中调用方法启动的复制过程?

比如说,我有一个我需要从中复制的 csv 文件列表,以获得最大的数据库服务器功能。因此,我为 n 个文件创建了 n 个线程连接,但是如果例如选择了错误的文件,我找不到中止单个操作的方法。

杀死线程不起作用 - COPY 只是继续运行。

该类FutureTask<>用于创建线程,因此有一个它们的列表 - 每个 csv 一个。

task.cancel(true)就服务器上的复制过程而言,调用没有任何作用。只能System.exit()用火杀死它。

有任何想法吗?

我的一些代码:

Uploader.java implements Callable

public static long uploadFile(final File file, final String tableName) {

    long status = 0;
    try {
        CopyManager copyManager = 
           new CopyManager((BaseConnection) new DataSource().connect());
        FileReader reader = new FileReader(file);
        status = copyManager.copyIn(sql, reader);
    } catch (SQLException | IOException e) {
       ...
    }
    return status;
}

@Override
public Long call() throws Exception {
    return uploadFile(file, tableName);
}

Upload files method body

for (File file : files) {
        FutureTask<Long> ftask =
                new FutureTask<>(
                        new Uploader(defaultTableName, file)
                );
        tasks.add(ftask);
        execService.execute(ftask);
    }

解决了:

找到了解决方案,但是它需要对代码进行一些更改。

Upload files method body现在看起来像这样

for (File file : files) {
    Uploader uploader = new Uploader(defaultTableName, file);
    uploaders.add(uploader);
    Future<Long> f = execService.submit(uploader);

    //save the Future to get the copy result when finished

}

有了这个,我们可以很容易地调用 someUploader的方法,可以关闭数据库连接并正确处理异常。它将停止在服务器上复制。

我承认该解决方案可能不是最优雅的解决方案,但是它可以工作,运行速度很快,并且不需要太多代码。

4

2 回答 2

2

PostgreSQL 实际上不支持带内查询取消。

当您从 JDBC 驱动程序请求查询取消时,它会建立一个新连接以发送取消消息。(这意味着如果您max_connections取消将失败,这有点不正常)。

这样做的结果是你可以自己做同样的事情:

  • 用于pg_backend_pid()在开始复制操作之前获取worker的进程ID;

  • 当您想取消复制时,请打开一个新连接并pg_cancel_backend(?)使用之前记录的 pid 发出问题。如果它没有停止,您可以稍等片刻,然后执行pg_terminate_backend(?).

这些是普通的 SQL 级函数。

唯一真正的问题是取消和终止请求是会话级别而不是语句级别。所以他们可以与语句完成和新语句的开始竞争,例如:

  • client1:复制开始
  • client2:连接发送取消消息
  • client1:复制完成
  • client1:新的单独副本开始
  • client2 发送 pg_cancel_backend(...)

此时,第二个副本将被终止,这可能不是您想要的。因此,您必须确保使用适当的排除客户端来防止这种情况发生,确保在开始新语句之前完成任何未完成的取消请求。

IIRC JDBC 驱动程序在内部也有同样的问题。这是团队真正想要一种方法来取消特定唯一的每个会话语句序列号的原因之一,例如pg_cancel_backend(pid, statementnumber)如果语句已经终止,则(当前不存在)会因错误而中止,而不是无论如何发送取消。

于 2013-09-04T09:13:49.817 回答
1

免责声明:我没有尝试过这个,我只是通过查看源代码得到了这个想法

有一种CopyManager.copyIn(String sql)方法可以返回CopyIn接口的一个实例,该实例又是CopyOperation. 该接口有一个cancelCopy()方法。

在此处查看 JavaDocs:http: //jdbc.postgresql.org/documentation/publicapi/org/postgresql/copy/CopyOperation.html#cancelCopy%28%29

但是采用流复制数据的方法只返回一个长值,因此无法使用那里使用的 CopyOperation 实例。

但是,在查看 copyIn() 方法的源代码时,您自己似乎很容易做到这一点。

copyIn(String, Reader)因此,基本上在您的代码中使用该方法中的代码,而不是调用您:

// your code 
CopyManager copyManager = 
       new CopyManager((BaseConnection) new DataSource().connect());
FileReader from = ...  // different name!
int bufferSize = 65536;

// here starts the copy of the driver's implementation of the copyIn() method.

char[] cbuf = new char[bufferSize];
int len;

// if you store the instance of the CopyIn interface in an instance variable you 
// should be able to call cancelCopy() on it
CopyIn cp = copyManager.copyIn(sql);  

try {
    while ( (len = from.read(cbuf)) > 0) {
        byte[] buf = encoding.encode(new String(cbuf, 0, len));
        cp.writeToCopy(buf, 0, buf.length);
    }
    return cp.endCopy();
} finally { // see to it that we do not leave the connection locked
    if(cp.isActive())
        cp.cancelCopy();
}
于 2013-09-04T08:02:59.853 回答