4

我正在开发一个使用 Spring-Webflux + Spring-data-r2dbc 和 r2dbc 驱动程序连接到 Postgresql 数据库的多租户反应式应用程序。多租户部分是基于模式的:每个租户一个模式。因此,根据上下文(例如用户登录),请求将命中数据库的某个模式。

我正在为如何在 r2dbc 中实现这一点而苦苦挣扎。理想情况下,这将是 Hibernate 使用MultiTenantConnectionProvider的方式(参见示例 16.3)。

我发现了什么以及到目前为止我做了什么:

  • 可以使用这里提到的AbstractRoutingConnectionFactory。但我被迫按租户/模式创建一个 ConnectionFactory。在我看来,这远非高效/可扩展,我宁愿使用像r2dbc-pool这样的连接池
  • 我看着PostgresqlConnectionFactory。这里有趣的是 onprepareConnection有一个电话setSchema(connection)

    private Mono<Void> setSchema(PostgresqlConnection connection) {
        if (this.configuration.getSchema() == null) {
            return Mono.empty();
        }
    
        return connection.createStatement(String.format("SET SCHEMA '%s'", this.configuration.getSchema()))
            .execute()
            .then();
    }
    

可能我需要找到一种方法来覆盖它以便从上下文而不是配置中动态获取架构?

  • 否则我可以尝试将请求中的模式指定为表前缀:

        String s = "tenant-1";
        databaseClient.execute("SELECT * FROM \"" + s + "\".\"city\"")
                .as(City.class)
                .fetch()
                .all()
    

但是我不能再使用 SpringData 或者我需要覆盖每个请求以将租户作为参数传递。

任何提示/帮助表示赞赏:)

4

4 回答 4

1

我也遇到了这个。

这是我目前正在做的事情:

  • 将PostgresqlConnectionConfigurationBuilderPostgresqlConnectionFactory发布为 Bean:

    @Bean
    public PostgresqlConnectionConfiguration.Builder postgresqlConnectionConfiguration() {
        return PostgresqlConnectionConfiguration.builder()
                .host("localhost")
                .port(5432)
                .applicationName("team-toplist-service")
                .database("db")
                .username("user")
                .password("password");
    }
    
    @Bean
    @Override
    public PostgresqlConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(postgresqlConnectionConfiguration()
                .build());
    }
    

这样我以后可以(在我的业务方法中)使用注入的PostgresqlConnectionConfigurationBuilder实例创建一个新的PostgresqlConnectionFactory -现在也使用在构建器上调用的“模式”设置器(在从传入的org.springframework.web.reactive提取租​​户信息之后.function.server.ServerRequest我从路由 bean 传递下来的。

我的数据库模式遵循模式appname_tenantId,所以我们有一个静态配置为“app_name”的“appName”,所以我最终得到了像“app_name_foo_bar123”这样的模式名称

接下来,我们有一个租户标识符,在我的情况下,它来自一个请求标头,该标头保证由位于上游的 apache 服务器设置(传递传入请求的 X-Tenant-Id 标头,以便不依赖 URL 来执行租户特定路由)

所以我的“逻辑”目前看起来有点像这样:

public Flux<TopTeam> getTopTeams(ServerRequest request) {

    List<String> tenantHeader = request.headers().header("X-Tenant-Id");
    // resolve relevant schema name on the fly
    String schema = (appName+ "_" + tenantHeader.iterator().next()).replace("-", "_");
    System.out.println("Using schema: " + schema);
    // configure connfactory with schema set on the builder
    PostgresqlConnectionFactory cf = new PostgresqlConnectionFactory(postgresqlConnectionConfiguration.schema(schema).build());
    // init new DatabaseClient with tenant specific connection
    DatabaseClient cli = DatabaseClient.create(cf);


        return cli
                .execute("select * from top_teams ").fetch().all()
                .flatMap(map -> {

                    ...
                    });
                });
    }

这个逻辑当然可以抽象出来,但是不确定放在哪里,也许可以将它移到 MethodArgumentResolver 中,这样我们就可以注入一个已经配置的 DatabaseClient


ps:这只解决了使用DatabaseClient时的多租户问题。我不确定如何使用 R2dbcRepositories 进行这项工作

于 2020-06-12T21:44:50.297 回答
1

感谢您的回答。我终于得到了这个解决方案:

按租户/模式构建 ConnectionFactory:

public class CloudSpringUtilsConnectionFactoryBuilder implements ConnectionFactoryBuilder {

@Override
public ConnectionFactory buildConnectionFactory(String schema) {
    PostgresqlConnectionConfiguration configuration = getPostgresqlConnectionConfigurationBuilder(schema)
            .build();
    return new PostgresqlConnectionFactory(configuration);
}

@Override
public ConnectionFactory buildSimpleConnectionFactory() {
    PostgresqlConnectionConfiguration configuration = getPostgresqlConnectionConfigurationBuilder(null)
            .build();
    return new PostgresqlConnectionFactory(configuration);
}

protected PostgresqlConnectionConfiguration.Builder getPostgresqlConnectionConfigurationBuilder(String schema) {
    return PostgresqlConnectionConfiguration
            .builder()
            .username(dbUser)
            .password(dbPassword)
            .host(dbHost)
            .port(dbPort)
            .database(dbName)
            .schema(schema);
}

根据租户创建一个 TenantRoutingConnectionFactory 以获取正确的 ConnectionFactory。在我们的例子中,租户是从身份验证主体中提取的(将令牌转换为 UserProfile):

public class TenantRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

private final DatabaseMigrationService databaseMigrationService;
private final ConnectionFactoryBuilder connectionFactoryBuilder;

private final Map<String, ConnectionFactory> targetConnectionFactories = new ConcurrentHashMap<>();

@PostConstruct
private void init() {
    setLenientFallback(false);
    setTargetConnectionFactories(new HashMap<>());
    setDefaultTargetConnectionFactory(connectionFactoryBuilder.buildConnectionFactory());
}

@Override
protected Mono<Object> determineCurrentLookupKey() {
    return ReactiveSecurityContextHolder.getContext()
            .map(this::getTenantFromContext)
            .flatMap(tenant -> databaseMigrationService.migrateTenantIfNeeded(tenant)
                    .thenReturn(tenant));
}

private String getTenantFromContext(SecurityContext securityContext) {
    String tenant = null;
    Object principal = securityContext.getAuthentication().getPrincipal();
    if (principal instanceof UserProfile) {
        UserProfile userProfile = (UserProfile) principal;
        tenant = userProfile.getTenant();
    }
    ...
    log.debug("Tenant resolved: " + tenant);
    return tenant;
}

@Override
protected Mono<ConnectionFactory> determineTargetConnectionFactory() {
    return determineCurrentLookupKey().map(k -> {
        String key = (String) k;
        if (!targetConnectionFactories.containsKey(key)) {
            targetConnectionFactories.put(key, connectionFactoryBuilder.buildConnectionFactory(key));
        }
        return targetConnectionFactories.get(key);
    });
}

请注意,我们在 DatabaseMigrationService 中使用 Flyway 为我们获得的每个租户创建和迁移架构。

于 2020-10-01T16:24:54.630 回答
0

谢谢/基于@charlie carver 的回答,我就是这样解决这个问题的:

控制器:

    @PostMapping(MAP + PATH_DDL_PROC_DB)  //PATH_DDL_PROC_DB = "/database/{db}/{schema}/{table}"
    public Flux<Object> createDbByDb(
            @PathVariable("db") String db,
            @PathVariable("schema") String schema,
            @PathVariable("table") String table) {
        return ddlProcService.createDbByDb(db,schema,table);

服务:

    public Flux<Object> createDbByDb(String db,String schema,String table) {
        return ddl.createDbByDb(db,schema,table);
    }

存储库:

    @Autowired
    PostgresqlConnectionConfiguration.Builder connConfig;

    public Flux<Object> createDbByDb(String db,String schema,String table) {
        return createDb(db).thenMany(
                Mono.from(connFactory(connConfig.database(db)).create())
                    .flatMapMany(
                            connection ->
                                    Flux.from(connection
                                                      .createBatch()
                                                      .add(sqlCreateSchema(db))
                                                      .add(sqlCreateTable(db,table))
                                                      .add(sqlPopulateTable(db,table))
                                                      .execute()
                                             )));
    }

    private Mono<Void> createDb(String db) {

        PostgresqlConnectionFactory
                connectionFactory = connFactory(connConfig);

        DatabaseClient ddl = DatabaseClient.create(connectionFactory);

        return ddl
                .execute(sqlCreateDb(db))
                .then();
    }

连接类:

@Slf4j
@Configuration
@EnableR2dbcRepositories
public class Connection extends AbstractR2dbcConfiguration {

    /*
     **********************************************
     * Spring Data jdbc:
     *      DDL: does support JPA.
     *
     * Spring Data R2DBC
     *      DDL:
     *          -does no support JPA
     *          -To achieve DDL, uses R2dbc.DataBaseClient
     *
     *      DML:
     *          -it uses R2dbcREpositories
     *          -R2dbcRepositories is different than
     *          R2dbc.DataBaseClient
     * ********************************************
     */
    @Bean
    public PostgresqlConnectionConfiguration.Builder connectionConfig() {
        return PostgresqlConnectionConfiguration
                .builder()
                .host("db-r2dbc")
                .port(5432)
                .username("root")
                .password("root");
    }

    @Bean
    public PostgresqlConnectionFactory connectionFactory() {
        return
                new PostgresqlConnectionFactory(
                        connectionConfig().build()
                );
    }
}

DDL 脚本:

@Getter
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DDLScripts {

    public static final String SQL_GET_TASK = "select * from tasks";

    public static String sqlCreateDb(String db) {
        String sql = "create database %1$s;";
        String[] sql1OrderedParams = quotify(new String[]{db});
        String finalSql = format(sql,(Object[]) sql1OrderedParams);
        return finalSql;
    }

    public static String sqlCreateSchema(String schema) {
        String sql = "create schema if not exists %1$s;";
        String[] sql1OrderedParams = quotify(new String[]{schema});
        return format(sql,(Object[])  sql1OrderedParams);
    }

    public static String sqlCreateTable(String schema,String table) {

        String sql1 = "create table %1$s.%2$s " +
                "(id serial not null constraint tasks_pk primary key, " +
                "lastname varchar not null); ";
        String[] sql1OrderedParams = quotify(new String[]{schema,table});
        String sql1Final = format(sql1,(Object[])  sql1OrderedParams);

        String sql2 = "alter table %1$s.%2$s owner to root; ";
        String[] sql2OrderedParams = quotify(new String[]{schema,table});
        String sql2Final = format(sql2,(Object[])  sql2OrderedParams);

        return sql1Final + sql2Final;
    }

    public static String sqlPopulateTable(String schema,String table) {

        String sql = "insert into %1$s.%2$s values (1, 'schema-table-%3$s');";
        String[] sql1OrderedParams = quotify(new String[]{schema,table,schema});
        return format(sql,(Object[]) sql1OrderedParams);
    }

    private static String[] quotify(String[] stringArray) {

        String[] returnArray = new String[stringArray.length];

        for (int i = 0; i < stringArray.length; i++) {
            returnArray[i] = "\"" + stringArray[i] + "\"";
        }
        return returnArray;
    }
}
于 2020-09-29T16:40:37.193 回答
0

我为 r2dbc 创建了一个多租户示例,但使用了每个数据库的策略。

在此处查看完整的示例代码

在某些数据库中,模式和数据库概念是等价的。如果您坚持使用 per-schema 策略,请在获取连接时添加 SQL 以选择模式(请研究您正在使用的数据库,并确定设置模式的正确子句)。

于 2020-09-15T06:06:50.823 回答