0

我觉得这应该是可能的,但我查看了 wandb SDK 代码,但找不到简单/合乎逻辑的方法。稍后可能通过修改清单条目来破解它(但可能在工件被记录到 wandb 之前,然后清单和条目可能被锁定)?我在 SDK 代码中看到了这样的内容:

version = manifest_entry.extra.get("versionID")
etag = manifest_entry.extra.get("etag")

所以,我想我们可能可以编辑那些?

更新

所以,我试图用这样的东西来破解它,它可以工作,但感觉不对:

import os
import wandb
import boto3
from wandb.util import md5_file

ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")
API_KEY = os.environ.get("WANDB_API_KEY")

api = api = wandb.Api(overrides={"entity": ENTITY, "project": ENTITY})
run = wandb.init(entity=ENTITY, project=PROJECT, job_type="test upload")
file = "admin2Codes.txt"  # "admin1CodesASCII.txt" # (both already on s3 with a couple versions)
artifact = wandb.Artifact("test_data", type="dataset")

# modify one of the local files so it has a new md5hash etc.
with open(file, "a") as f:
    f.write("new_line_1\n")

# upload local file to s3
local_file_path = file
s3_url = f"s3://bucket/prefix/{file}"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])

s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
    local_file_path,
    s3_bucket,
    key,
    # save the md5_digest in metadata,
    # can be used later to only upload new files to s3,
    # as AWS doesn't digest the file consistently in the E-tag
    ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
head_response = s3_client.head_object(Bucket=s3_bucket, Key=key)
version_id: str = head_response["VersionId"]
print(version_id)

# upload a link/ref to this s3 object in wandb:
artifact.add_reference(s3_dir)
# at this point we might be able to modify the artifact._manifest.entries and each entry.extra.get("etag") etc.?
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
# set these to an older version on s3 that we know we want (rather than latest) - do this via wandb public API:
dataset_v2 = api.artifact(f"{ENTITY}/{PROJECT}/test_data:v2", type="dataset")
# artifact._manifest.add_entry(dataset_v2.manifest.entries["admin1CodesASCII.txt"])
artifact._manifest.entries["admin1CodesASCII.txt"] = dataset_v2.manifest.entries[
    "admin1CodesASCII.txt"
]
# verify that it did change:
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])

run.log_artifact(artifact)  # at this point the manifest is locked I believe?
artifact.wait()  # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(artifact.name)
run_id = run.id
run.finish()
curr_run = api.run(f"{ENTITY}/{PROJECT}/{run_id}")
used_artifacts = curr_run.used_artifacts()
logged_artifacts = curr_run.logged_artifacts()

我在正确的轨道上吗?我想另一种解决方法是在 s3 上制作一个副本(这样旧版本又是最新的)但我想避免这种情况,因为我想使用旧版本的 1 文件是一个大型 NLP 模型和唯一的文件我要更改的是小的 config.json 文件等(因此再次上传所有文件似乎非常浪费)。

我还想知道,当我将旧版本的对象复制回存储桶中的同一个键时,是否会创建一个真正的副本,或者就像指向同一个底层对象的指针一样。boto3 和 AWS 文档都没有说明这一点——尽管它看起来像是一个正确的副本。

4

1 回答 1

0

我想我现在找到了正确的方法:

import os
import wandb
import boto3
from wandb.util import md5_file

ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")


def wandb_update_only_some_files_in_artifact(
    existing_artifact_name: str,
    new_s3_file_urls: list[str],
    entity: str = ENTITY,
    project: str = PROJECT,
) -> Artifact:
    """If you want to just update a config.json file for example,
    but the rest of the artifact can remain the same, then you can
    use this functions like so:
    wandb_update_only_some_files_in_artifact(
        "old_artifact:v3",
        ["s3://bucket/prefix/config.json"],
    )
    and then all the other files like model.bin will be the same as in v3,
    even if there was a v4 or v5 in between (as the v3 VersionIds are used)

    Args:
        existing_artifact_name (str): name with version like "old_artifact:v3"
        new_s3_file_urls (list[str]): files that should be updated
        entity (str, optional): wandb entity. Defaults to ENTITY.
        project (str, optional): wandb project. Defaults to PROJECT.

    Returns:
        Artifact: the new artifact object
    """
    api = wandb.Api(overrides={"entity": entity, "project": project})
    old_artifact = api.artifact(existing_artifact_name)
    old_artifact_name = re.sub(r":v\d+$", "", old_artifact.name)
    with wandb.init(entity=entity, project=project) as run:
        new_artifact = wandb.Artifact(old_artifact_name, type=old_artifact.type)

        s3_file_names = [s3_url.split("/")[-1] for s3_url in new_s3_file_urls]
        # add the new ones:
        for s3_url, filename in zip(new_s3_file_urls, s3_file_names):
            new_artifact.add_reference(s3_url, filename)
        # add the old ones:
        for filename, entry in old_artifact.manifest.entries.items():
            if filename in s3_file_names:
                continue
            new_artifact.add_reference(entry, filename)
            # this also works but feels hackier:
            # new_artifact._manifest.entries[filename] = entry

        run.log_artifact(new_artifact)
        new_artifact.wait()  # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
        print(new_artifact.name)
        print(run.id)
    return new_artifact


# usage:
local_file_path = "config.json" # modified file
s3_url = "s3://bucket/prefix/config.json"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])

s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
    local_file_path,
    s3_bucket,
    key,
    # save the md5_digest in metadata,
    # can be used later to only upload new files to s3,
    # as AWS doesn't digest the file consistently in the E-tag
    ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)

wandb_update_only_some_files_in_artifact(
    "old_artifact:v3",
    ["s3://bucket/prefix/config.json"],
)
于 2022-01-25T13:57:32.817 回答