0

所以基本上我想做的是以下几点:

  1. 从数据库中加载一批数据
  2. 将该数据(查询结果)映射Object[]到以可读格式表示数据的类
  3. 写入文件
  4. 重复直到查询没有更多结果

我列出了我熟悉的似乎符合需求的结构,以及为什么它们不符合我的需求。

  • 迭代器 → 不调用就没有映射和过滤的选项next()
    • 我需要在子类中定义映射函数,但实际上没有数据(类似于流),这样我就可以将“流”方式传递给调用类,并且只有 call next,然后调用所有映射函数因此
  • 流 → 在映射和过滤可能之前,所有数据都需要可用
  • 可观察的→一旦数据可用就发送数据。我需要同步处理它

为了更多地了解我正在尝试做的事情,我做了一个小例子:

// Disclaimer: "Something" is the structure I am not sure of now. 
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
    @Inject
    private DataGetter dataGetter;

    public void doWork() {
        FileWriter writer = new FileWriter("filename");

        // Write the formatted data to the file
        dataGetter.getData()
            .forEach(data -> writer.writeToFile(data));
    }
}

public class FileWriter {
    public void writeToFile(List<Thing> data) {
        // Write to file
    }
}

public class DataGetter {
    @Inject
    private ThingDao thingDao;

    public Something<List<Thing>> getData() {

        // Map data to the correct format and return that
        return thingDao.getThings()
            .map(partialResult -> /* map to object */);
    }
}

public class ThingDao {

    public Something<List<Object[]>> getThings() {
        Query q = ...;
        // Dont know what to return
    }
}

到目前为止我得到了什么:

我试图从 Iterator 的基础出发,因为它是唯一真正满足我的内存需求的。然后我添加了一些方法来映射和循环数据。虽然它并不是一个真正强大的设计,而且它会比我想象的更难,所以我想知道是否已经有任何东西可以满足我的需要。

public class QIterator<E> implements Iterator<List<E>> {
    public static String QUERY_OFFSET = "queryOffset";
    public static String QUERY_LIMIT = "queryLimit";

    private Query query;

    private long lastResultIndex = 0;
    private long batchSize;

    private Function<List<Object>, List<E>> mapper;

    public QIterator(Query query, long batchSize) {
        this.query = query;
        this.batchSize = batchSize;
    }

    public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
        this(query, batchSize);
        this.mapper = mapper;
    }

    @Override
    public boolean hasNext() {
        return lastResultIndex % batchSize == 0;
    }

    @Override
    public List<E> next() {
        query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
        query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);

        List<Object> result = (List<Object>) query.getResultList(); // unchecked
        lastResultIndex += result.size();

        List<E> mappedResult;
        if (mapper != null) {
            mappedResult = mapper.apply(result);
        } else {
            mappedResult = (List<E>) result; // unchecked
        }

        return mappedResult;
    }

    public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
        return new QIterator<>(query, batchSize, (data) -> {
            if (this.mapper != null) {
                return appendingMapper.apply(this.mapper.apply(data));
            } else {
                return appendingMapper.apply((List<E>) data);
            }
        });
    }

    public void forEach(BiConsumer<List<E>, Integer> consumer) {
        for (int i = 0; this.hasNext(); i++) {
            consumer.accept(this.next(), i);
        }
    }
}

到目前为止,这是可行的,但是有一些unchecked我不太喜欢的任务,而且我希望能够将一个 QIterator “附加”到另一个 QIterator 本身并不难,但它也应该采用后面的地图附加。

4

2 回答 2

2

假设您有一个以分页方式提供数据的 DAO,例如通过将LIMITandOFFSET子句应用于底层 SQL。这样的 DAO 类将有一个将这些值作为参数的方法,即该方法将符合以下功能方法:

@FunctionalInterface
public interface PagedDao<T> {
    List<T> getData(int offset, int limit);
}

例如,调用getData(0, 20)将返回前 20 行(第 1 页),调用getData(60, 20)将返回第 4 页的 20 行。如果该方法返回的行数少于 20 行,则意味着我们得到了最后一页。在最后一行之后询问数据将返回一个空列表。

对于下面的演示,我们可以模拟这样一个 DAO 类:

public class MockDao {
    private final int rowCount;
    public MockDao(int rowCount) {
        this.rowCount = rowCount;
    }
    public List<SimpleRow> getSimpleRows(int offset, int limit) {
        System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
        if (offset < 0 || limit <= 0)
            throw new IllegalArgumentException();
        List<SimpleRow> data = new ArrayList<>();
        for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
            data.add(new SimpleRow("Row #" + rowNo));
        System.out.println("DEBUG:   data = " + data);
        return data;
    }
}

public class SimpleRow {
    private final String data;
    public SimpleRow(String data) {
        this.data = data;
    }
    @Override
    public String toString() {
        return "Row[data=" + this.data + "]";
    }
}

然后,如果您想Stream从该方法生成 a of 行,将所有行流式传输到一定大小的块中,我们需要一个Spliterator为此,因此我们可以使用它StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)来创建一个流。

这是这样的一个实现Spliterator

public class PagedDaoSpliterator<T> implements Spliterator<T> {
    private final PagedDao<T> dao;
    private final int blockSize;
    private int nextOffset;
    private List<T> data;
    private int dataIdx;
    public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
        if (blockSize <= 0)
            throw new IllegalArgumentException();
        this.dao = Objects.requireNonNull(dao);
        this.blockSize = blockSize;
    }
    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (this.data == null) {
            if (this.nextOffset == -1/*At end*/)
                return false; // Already at end
            this.data = this.dao.getData(this.nextOffset, this.blockSize);
            this.dataIdx = 0;
            if (this.data.size() < this.blockSize)
                this.nextOffset = -1/*At end, after this data*/;
            else
                this.nextOffset += data.size();
            if (this.data.isEmpty()) {
                this.data = null;
                return false; // At end
            }
        }
        action.accept(this.data.get(this.dataIdx++));
        if (this.dataIdx == this.data.size())
            this.data = null;
        return true;
    }
    @Override
    public Spliterator<T> trySplit() {
        return null; // Parallel processing not supported
    }
    @Override
    public long estimateSize() {
        return Long.MAX_VALUE; // Unknown
    }
    @Override
    public int characteristics() {
        return ORDERED | NONNULL;
    }
}

我们现在可以使用上面的模拟 DAO 进行测试:

MockDao dao = new MockDao(13);
Stream<SimpleRow> stream = StreamSupport.stream(
        new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false);
stream.forEach(System.out::println);

输出

DEBUG: getData(0, 5)
DEBUG:   data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
Row[data=Row #1]
Row[data=Row #2]
Row[data=Row #3]
Row[data=Row #4]
Row[data=Row #5]
DEBUG: getData(5, 5)
DEBUG:   data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
Row[data=Row #6]
Row[data=Row #7]
Row[data=Row #8]
Row[data=Row #9]
Row[data=Row #10]
DEBUG: getData(10, 5)
DEBUG:   data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
Row[data=Row #11]
Row[data=Row #12]
Row[data=Row #13]

可以看出,我们得到了 13 行数据,以 5 行为一组从数据库中检索出来。

直到需要数据时才会从数据库中检索数据,从而导致内存占用低,具体取决于块大小和流操作未缓存数据。

于 2020-08-25T11:38:14.477 回答
1

您可以在一行中执行以下操作:

stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(queryThatReturnsAllRowsOrdered);
Stream.generate(rs.next() ? map(rs) : null)
  .takeWhile(Objects::nonNull)
  .filter(<some predicate>)
  .forEach(<some operation);

当从查询返回第一行时开始处理,并与数据库并行继续,直到读取所有行。

这种方法一次只有一行在内存中,并且通过只运行 1 个查询来最小化数据库的负载。

从 a 映射ResultSet比从映射更容易和自然,Object[]因为您可以按名称和正确键入的值访问列,例如:

MyDao map(ResultSet rs) {
    try {
        String someStr = rs.getString("COLUMN_X");
        int someInt = rs.getInt("COLUMN_Y"):
        return new MyDao(someStr, someInt);
    } catch (SQLException e ) {
        throw new RuntimeException(e);
    }
}
于 2020-08-25T17:50:40.997 回答