3

我想知道,如果我的查询返回数百万行,JdbcIO 将如何并行执行查询。我已经提到了https://issues.apache.org/jira/browse/BEAM-2803和相关的拉取请求。我完全无法理解。

ReadAll expand方法使用ParDo. 因此,它会创建到数据库的多个连接以并行读取数据吗?如果我限制可以创建到数据源中数据库的连接数,它会坚持连接限制吗?

谁能帮我理解这将如何处理JdbcIO?我在用2.2.0

更新 :

.apply(
          ParDo.of(
              new ReadFn<>(
                  getDataSourceConfiguration(),
                  getQuery(),
                  getParameterSetter(),
                  getRowMapper())))

上面的代码显示 ReadFn 与 ParDo 一起应用。我认为,ReadFn 将并行运行。如果我的假设是正确的,我将如何使用该readAll()方法从一次只能建立有限数量的连接的数据库中读取?

谢谢巴鲁

4

3 回答 3

1

ReadAll 方法处理您有许多多个查询的情况。您可以将查询存储为字符串的 PCollection,其中每个字符串都是查询。然后在读取时,每个项目在单个 ParDo 中作为单独的查询进行处理。

这不适用于少量查询,因为它限制了查询数量的并行性。但是如果你有很多,那么它会执行得更快。大多数 ReadAll 调用都是这种情况。

从代码看来,在 setup 函数中,每个工作人员都建立了一个连接。这可能包括几个查询,具体取决于工作人员的数量和查询的数量。

查询限制设置在哪里?无论有没有 ReadAll,它的行为都应该相似。

有关更多信息,请参阅 jira:https ://issues.apache.org/jira/browse/BEAM-2706

我对 jdbcIO 不是很熟悉,但似乎他们实现了 jira 中建议的版本。PCollection 可以是任何东西,然后是一个回调,以根据 PCollection 中的元素修改查询。这允许 PCollection 中的每个项目表示一个查询,但比将新查询作为每个元素更灵活一些。

于 2017-12-27T20:14:33.203 回答
1

我创建了一个数据源,如下所示。

    ComboPooledDataSource cpds = new ComboPooledDataSource();
    cpds.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver
    cpds.setJdbcUrl("jdbc:mysql://<IP>:3306/employees");
    cpds.setUser("root");
    cpds.setPassword("root");
    cpds.setMaxPoolSize(5);

现在有更好的方法来设置这个驱动程序。我将数据库池大小设置为 5。在进行JdbcIO转换时,我使用此数据源来创建连接。在管道中,我设置

option.setMaxNumWorkers(5);
option.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);

我使用了一个返回大约 300 万条记录的查询。在观察数据库连接时,程序运行时连接数逐渐增加。它在某些实例上最多使用 5 个连接。我认为,这就是我们在运行JdbcIOtrnsformation 以从数据库加载大量数据时限制创建到数据库的连接数的方式。

ComboPoolDataSource 的 Maven 依赖项

    <dependency>
        <groupId>c3p0</groupId>
        <artifactId>c3p0</artifactId>
        <version>0.9.1.2</version>
    </dependency>

**如果我在这里遗漏了什么,请随时纠正答案。*

于 2018-01-04T10:20:01.553 回答
0

我有类似的任务我从数据库中获取记录数并将其拆分为 1000 条记录的范围然后我将 readAll 应用于范围的 PCollection 这里是解决方案的描述。并感谢 Balu reg。数据源配置。

于 2018-07-31T14:00:09.207 回答