不完全相同,但我在这里的回答可能有用。
我建议在您的情况下,最简单的方法确实是定义两个目录条目并将 Kedro 保存到它们(并从本地加载以提高速度),这为您提供了最大的灵活性,尽管我承认不是最漂亮的。
在避免所有需要返回两个值的节点函数方面,我建议将装饰器应用于您使用特定标签标记的某些节点,例如tags=["s3_replica"]
从以下脚本中获取灵感(从我的同事那里窃取):
class S3DataReplicationHook:
"""
Hook to replicate the output of any node tagged with `s3_replica` to S3.
E.g. if a node is defined as:
node(
func=myfunction,
inputs=['ds1', 'ds2'],
outputs=['ds3', 'ds4'],
tags=['tag1', 's3_replica']
)
Then the hook will expect to see `ds3.s3` and `ds4.s3` in the catalog.
"""
@hook_impl
def before_node_run(
self,
node: Node,
catalog: DataCatalog,
inputs: Dict[str, Any],
is_async: bool,
run_id: str,
) -> None:
if "s3_replica" in node.tags:
node.func = _duplicate_outputs(node.func)
node.outputs = _add_local_s3_outputs(node.outputs)
def _duplicate_outputs(func: Callable) -> Callable:
def wrapped(*args, **kwargs):
outputs = func(*args, **kwargs)
return (outputs,) + (outputs,)
return wrapped
def _add_local_s3_outputs(outputs: List[str]) -> List[str]:
return outputs + [f'{o}.s3' for o in outputs]
以上是一个钩子,因此您可以将它放在hooks.py
项目中的文件(或任何您想要的地方)中,然后将其导入您的settings.py
文件并放入:
from .hooks import ProjectHooks, S3DataReplicationHook
hooks = (ProjectHooks(), S3DataReplicatonHook())
在你的settings.py
.
您可以稍微巧妙地使用输出命名约定,以便它只复制某些输出(例如,也许您同意所有以结尾的目录条目.local
也必须具有相应的.s3
条目,并且您相应地改变了outputs
您node
的比对每个输出都这样做。
如果您想更聪明一点,您可以使用挂钩将相应的 S3 条目注入到目录中,after_catalog_created
而不是再次按照您选择的命名约定在目录中手动编写数据集的 S3 版本。尽管我认为从长远来看,编写 S3 条目更具可读性。