所以假设我有两个固体。第一个进行一些计算并将文件写入磁盘。第二个实体获取该文件并用它做其他事情,但它需要它的文件系统路径才能打开它。我可以使用两个yield
s (一个用于 the AssetMaterialization
,另一个用于 the str
Output
)并明确地将 theOutput
放入第二个可靠调用中:
from dagster import (AssetKey, AssetMaterialization, EventMetadataEntry,
Output, execute_pipeline, pipeline, solid)
@solid
def yield_asset(context):
yield AssetMaterialization(
asset_key=AssetKey('my_dataset'),
description='Persisted result to storage',
metadata_entries=[
EventMetadataEntry.text('Text-based metadata for this event',
label='text_metadata'),
EventMetadataEntry.fspath('/path/to/data/on/filesystem'),
EventMetadataEntry.url('http://mycoolsite.com/url_for_my_data',
label='dashboard_url'),
],
)
yield Output('/path/to/data/on/filesystem')
@solid
def print_asset_path(context, asset_path: str):
# do stuff with `asset_path`
context.log.info(asset_path)
@pipeline
def some_pipeline():
asset_path = yield_asset()
print_asset_path(asset_path)
if __name__ == "__main__":
result = execute_pipeline(some_pipeline)
这很好用,您应该在日志 ( 2021-03-16 13:23:29 - dagster - INFO - system - 366248ec-6a83-462f-b62f-9fb2514f6f80 - print_asset_path - /path/to/data/on/filesystem
) 和AssetMaterialization
in中获得信息消息dagit
。
但是,这有点不方便,因为我需要使用我需要Output
的文件系统路径显式生成一个。是否有可能以及如何AssetMaterialization
在第二个实体中引用 并直接使用其属性?
类似的东西(不起作用):
@solid
def print_asset_path(context):
asset_path = context.assets.get_asset_by_key(`my_key`).fspath
# do stuff with `asset_path`
context.log.info(asset_path)