11

我需要一个 Java 函数,它返回 SQLSELECT查询的结果作为InputStream另一个系统的参数,该系统通过网络发送结果。

但是,InputStream必须String使用自定义分隔符(即通常但不总是 CSV)。

虽然我可以轻松地创建一个函数来检索结果,创建一个 delimitedString并最终将其转换String为一个InputStream,但 SQL 结果通常太大而无法在内存中处理。此外,在返回结果之前处理整个结果集会导致不必要的等待时间。

如何返回一个InputStream以迭代 SQL 结果并在从数据库返回时发送已处理(分隔)的数据?

4

3 回答 3

9

发布(未测试)代码片段,它应该给你基本的想法:

/**
 * Implementors of this interface should only convert current row to byte array and return it.
 * 
 * @author yura
 */
public interface RowToByteArrayConverter {
    byte[] rowToByteArray(ResultSet resultSet);
}

public class ResultSetAsInputStream extends InputStream {

    private final RowToByteArrayConverter converter;
    private final PreparedStatement statement;
    private final ResultSet resultSet;

    private byte[] buffer;
    private int position;

    public ResultSetAsInputStream(final RowToByteArrayConverter converter, final Connection connection, final String sql, final Object... parameters) throws SQLException {
        this.converter = converter;
        statement = createStatement(connection, sql, parameters);
        resultSet = statement.executeQuery();
    }

    private static PreparedStatement createStatement(final Connection connection, final String sql, final Object[] parameters) {
        // PreparedStatement should be created here from passed connection, sql and parameters
        return null;
    }

    @Override
    public int read() throws IOException {
        try {
            if(buffer == null) {
                // first call of read method
                if(!resultSet.next()) {
                    return -1; // no rows - empty input stream
                } else {
                    buffer = converter.rowToByteArray(resultSet);
                    position = 0;
                    return buffer[position++] & (0xff);
                }
            } else {
                // not first call of read method
                if(position < buffer.length) {
                    // buffer already has some data in, which hasn't been read yet - returning it
                    return buffer[position++] & (0xff);
                } else {
                    // all data from buffer was read - checking whether there is next row and re-filling buffer
                    if(!resultSet.next()) {
                        return -1; // the buffer was read to the end and there is no rows - end of input stream
                    } else {
                        // there is next row - converting it to byte array and re-filling buffer
                        buffer = converter.rowToByteArray(resultSet);
                        position = 0;
                        return buffer[position++] & (0xff);
                    }
                }
            }
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }



    @Override
    public void close() throws IOException {
        try {
            statement.close();
        } catch(final SQLException ex) {
            throw new IOException(ex);
        }
    }
}

这是非常直接的实现,可以通过以下方式进行改进:

  • 可以删除 read 方法中 if 和 else 之间的代码重复 - 它只是为了澄清而发布
  • 而不是为每一行重新创建字节数组缓冲区(new byte[]操作成本很高),可以实现更复杂的逻辑来使用字节数组缓冲区,它只初始化一次然后重新填充。然后应该更改RowToByteArrayConverter.rowToByteArray方法的签名,int fillByteArrayFromRow(ResultSet rs, byte[] array)该签名应该返回填充的字节数并填充传递的字节数组。

因为字节数组包含它可以包含的有符号字节-1(实际上255是无符号字节),因此表明流的不正确结束,所以& (0xff)用于将有符号字节转换为无符号字节作为整数值。具体参考Java如何将int转换为byte?.

另请注意,如果网络传输速度较慢,这可能会长时间保持打开的结果集,从而给数据库带来问题。

希望这可以帮助 ...

于 2012-06-28T22:51:28.597 回答
2

我将通过引入以下内容来改进@Yura 建议的答案:
使用使用 ByteArrayOutputStream 初始化的 DataOutputStream,以便在 RowToByteArrayConverter 的实现中方便地将数据写入字节数组。
事实上,我建议有一个转换器的层次结构,它们都扩展同一个抽象类(这是我的想法的代码片段 - 可能不会从第一次编译)

public abstract class RowToByteArrayConverter {
  public byte[] rowToByteArray(ResultSet resultSet) {
      parseResultSet(dataOutputStream, resultSet);
      return byteArrayOutputSteam.toByteArray();
  }

  public RowToByteArrayConverter() {
    dataOutputStream = new DataOutputStream(byteArrayOutputStream);
  }

  protected DataOutputStream dataOutputStream;
  protected ByteArrayOutputStream byteArrayOutputStream;

  protected abstract void parseResultSet(DataOutputStream dataOutputStresm, ResultSet rs); 
}

现在,您可以通过简单地覆盖 parseResultSet 方法来覆盖这个类,
例如 - 编写代码,从记录中的“名称”列获取名称作为字符串。并在 DataOputputStream 上执行 writeUTF8。

于 2012-07-02T16:51:50.753 回答
2

上述答案为超出限制大小的字符串生成器的问题提供了有用的解决方案。它们还具有内存效率。但是,我的测试表明它们比仅仅将数据写入字符串构建器并调用

新的 ByteArrayInputStream(data.getBytes("UTF-8"))

获取输入流。

我发现性能更高的是通过使用分区函数对传入数据进行切片,然后对每个线程使用多个线程:

  1. 在源数据库中查询数据的子集
  2. 将数据写入目标

这也避免了总数据可能超过字符串缓冲区的最大大小的问题。

例如,我在 SQL Server 表中有 6m 条记录,其中有一列名为“RecordDate”。Recorddate 中的值在 2013 年和 2016 年之间变化。所以我将每个线程配置为分别请求 2013、14、15、16 的数据。然后,每个线程将转码后的数据写入 StringBuilder 并通过使用上述 getBytes() 转换为 Inputstream 将每个批量加载到目标。

这导致了 2 倍的加速。

为什么?因为源数据库和目标数据库可以处理多个并发请求,所以整体工作负载分布在所有三个进程中的多个线程上:源数据库、代码转换器、目标数据库。

于 2016-08-19T11:08:45.370 回答