0

我想使用 Bonobo 将数据从一个 Postgres 数据库移动到不同服务上的另一个数据库。我已配置连接,并希望在提取期间使用一个,在加载期间使用一个。

这是我的测试设置:

source_connection_config_env = 'DEV'
source_connection_config = get_config(source_connection_config_env)

target_connection_config_env = 'TRAINING'
target_connection_config = get_target_connection_config(target_connection_config_env)

...

def get_services(**options):
    if connection == 'source':
        return {
            'sqlalchemy.engine': create_postgresql_engine(**{
                    'host': source_connection_config.source_postres_connection['HOST'],
                    'name': source_connection_config.source_postres_connection['DATABASE'],
                    'user': source_connection_config.source_postres_connection['USER'],
                    'pass': source_connection_config.source_postres_connection['PASSWORD']
                })
        }

    if connetion == 'target':
        return {
            'sqlalchemy.engine': create_postgresql_engine(**{
                    'host': target_connection_config.target_postres_connection['HOST'],
                    'name': target_connection_config.target_postres_connection['DATABASE'],
                    'user': target_connection_config.target_postres_connection['USER'],
                    'pass': target_connection_config.target_postres_connection['PASSWORD']
                })
        }

我不确定更改连接的最佳位置在哪里,或者如何实际进行。

提前致谢!

4

1 回答 1

0

据我了解,您希望在同一个图中同时使用源连接和目标连接(我希望我做对了)。

所以你不能有这个条件,因为它只会返回一个。

相反,我会返回两者,名称不同:

def get_services(**options):
    return {
        'engine.source': create_postgresql_engine(**{...}),
        'engine.target': create_postgresql_engine(**{...}),
    }

然后在转换中使用不同的连接:

graph.add_chain(
    Select(..., engine='engine.source'),
    ...,
    InsertOrUpdate(..., engine='engine.target'),
)

请注意,服务名称只是字符串,没有强制执行约定或命名模式。'sqlalchemy.engine' 名称只是默认名称,但您不必同意它,只要您使用实际使用的名称配置转换即可。

于 2018-03-31T16:29:50.327 回答