0

s3_uri 没问题 MODELS.keys(),因为它不sys.exit()存在:

if s3_uri not in MODELS.keys(): ...

但是,该estimator.fit(s3_data, wait, logs=True)函数不适合s3_data,其中s3_data = s3_uri

我已经将这两个配置文件都包含在了它们的整体中:train_pipeline.yamlmodels.yaml.


print('s3_data: ', s3_data)
print('wait: ', wait)
>>>

s3_data:  s3://iotahoe-datascience/python_workers/dbpedia/data_focused_v9
wait:  True

Python脚本:

import glob
import json
import os
import pathlib
import sys
import tarfile
from time import time

import click
import pandas as pd
import toml
import yaml
from dagster import (Any, Bool, DagsterInstance, Dict, InputDefinition, Int,
                     Output, OutputDefinition, RetryRequested, String,
                     execute_pipeline, pipeline, reconstructable, solid)
from ontology_tagger.modules.s3_util import S3Util
from ontology_tagger.modules.serve import model_fn, predict_fn
from ontology_tagger.modules.utils import TextSerDes, metric_definitions, get_numeric_ratio
from pwmf.pipeline.utils import get_config, local_mode, prod_mode
from sagemaker.local import LocalSession
from sagemaker.pytorch import PyTorch, PyTorchModel


_path = pathlib.Path(__file__).parent.absolute()
DATA_DIR = os.getenv('DATA_PATH', default=_path / 'data')
try:
    with open(_path.parent / 'pyproject.toml') as fh:
        f = toml.load(fh)
    TORCH_VERSION = f['tool']['poetry']['dev-dependencies']['torch']
except Exception as e:
    print(f'Cannot load torch version from pyproject.toml {e}')

try:
    with open(_path / 'models.yaml') as fh:
        MODELS = yaml.safe_load(fh)
        MODEL_NAMES = ' '.join(list(MODELS.keys()))
except Exception as e:
    print(f'Cannot load models.yaml {e}')

role = 'arn:aws:iam::069192305883:role/service-role/AmazonSageMaker-ExecutionRole-20210105T082937'


@solid(
    input_defs=[
        InputDefinition('data', Dict),
        InputDefinition('hparams', Dict),
        InputDefinition('s3_uri', String),
        InputDefinition('instance_type', String),
        InputDefinition('max_retries', Int),
        InputDefinition('use_spot', Bool),
    ],
    output_defs=[OutputDefinition(Any, 'estimator')],
)
def train(context, hparams, instance_type, s3_uri, max_retries, data, use_spot):
    sm_localcheckpoint_dir = '/opt/ml/checkpoints/'

    train_max_run_secs = 2 * 24 * 60 * 60
    spot_wait_sec = 5 * 60
    max_wait_time_secs = train_max_run_secs + spot_wait_sec
    model_uri = None

    if not use_spot:
        max_wait_time_secs = None

    if s3_uri not in MODELS.keys():
        context.log.error(f'Data {s3_uri} is not available.')
        context.log.error(f'Available datasets registered with {MODEL_NAMES}')
        sys.exit(1)
    else:
        try:
            s3_uri = MODELS[s3_uri][context.mode_def.name]['data_s3']
            context.log.info(f'Using dataset {s3_uri}')
        except KeyError as e:
            context.log.error(f'Cannot find key {e} in models.yaml')
            sys.exit(1)

    s3_data = s3_uri
    s3_uri, _ = os.path.split(s3_uri)
    s3_output_path = f'{s3_uri}/output'
    s3_code_path = f'{s3_uri}/code'
    s3_checkpoint_path = f'{s3_uri}/checkpoint/'

    wait = True

    if instance_type.startswith('local'):
        use_spot = False
        max_wait_time_secs = 0
        wait = True
        sm_localcheckpoint_dir = None
        sagemaker_session = LocalSession()
        sagemaker_session.config = {'local': {'local_code': True}}

    hp = {
        'checkpointdir': sm_localcheckpoint_dir,
        'train_file': data['train'],
        'validate_file': data['val'],
        'labels': data['labels'],
    }

    if instance_type.startswith('ml.p3.') or instance_type.startswith('ml.g4dn.'):
        hp['gpu'] = 1

    if hparams:
        hp.update(hparams)
        if 'model_uri' in hparams:
            model_uri = hparams['model_uri']

    estimator = PyTorch(
        entry_point='main.py',
        source_dir='modules',
        role=role,
        framework_version=TORCH_VERSION,
        py_version='py3',
        instance_count=1,
        instance_type=instance_type,
        hyperparameters=hp,
        output_path=s3_output_path,
        metric_definitions=metric_definitions,
        volume_size=30,
        code_location=s3_code_path,
        debugger_hook_config=False,
        base_job_name='ontologytagger',
        use_spot_instances=use_spot,
        max_run=train_max_run_secs,
        max_wait=max_wait_time_secs,
        checkpoint_local_path=sm_localcheckpoint_dir,
        checkpoint_s3_uri=s3_checkpoint_path,
        model_uri=model_uri,
        subnets=['subnet-991feef2', 'subnet-deceffa4'],
        security_group_ids=['sg-db39d4b1'],
    )
    try:
        print('s3_data: ', s3_data)
        print('wait: ', wait)
        estimator.fit(s3_data, wait, logs=True)
    except Exception as e:
        print('Exception: ', e)
        raise RetryRequested(max_retries=max_retries)  # ERROR - line 136

...

追溯:

me@LAPTOP-G1DAPU88:~/.ssh/workers-python/workers/ontology_tagger/ontology_tagger$ poetry run python ontology_tagger_worker.py --pipeline train_pipeline
2021-09-29 09:59:00,005::DEBUG::dev::<module>::initializing logger with "dev" as runtime mode
2021-09-29 09:59:00,016::DEBUG::dev::<module>::initializing logger with "dev" as runtime mode
2021-09-29 09:59:00,020::DEBUG::dev::<module>::initializing logger with "dev" as runtime mode
/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/definitions/mode.py:66: UserWarning: system_storage_defs are deprecated and will be removed in 0.10.0 and should be replaced with intermediate_storage_defs for intermediates and resource_defs for files
  warnings.warn(
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - ENGINE_EVENT - Starting initialization of resources [asset_store].
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - ENGINE_EVENT - Finished initialization of resources [asset_store].
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - PIPELINE_START - Started execution of pipeline "train_pipeline".
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - ENGINE_EVENT - Executing steps in process (pid: 12676)
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_START - Started execution of step "train.compute".
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "data" of type "dict". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "hparams" of type "dict". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "s3_uri" of type "String". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "instance_type" of type "String". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "max_retries" of type "Int". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "use_spot" of type "Bool". (Type check passed).
2021-09-29 09:59:00 - dagster - INFO - system - 94408359-492b-4103-95b2-ed70249a2e15 - train.compute - Using dataset s3://iotahoe-datascience/python_workers/dbpedia/data_focused_v9

s3_data:  s3://iotahoe-datascience/python_workers/dbpedia/data_focused_v9
wait:  True

Exception:  Failed to upload /tmp/tmpj4otjedb/source.tar.gz to iotahoe-datascience/python_workers/dbpedia/code/ontologytagger-2021-09-29-08-59-00-692/source/sourcedir.tar.gz: An error occurred (RequestTimeTooSkewed) when calling the PutObject operation: The difference between the request time and the current time is too large.
2021-09-29 09:59:01 - dagster - ERROR - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_FAILURE - Execution of step "train.compute" failed.

Exceeded max_retries of 0
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 212, in _dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 286, in core_dagster_event_sequence_for_step
    for user_event in check.generator(
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 59, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 475, in _user_event_sequence_for_step_compute_fn
    for event in iterate_with_context(raise_interrupts_immediately, gen):
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/utils/__init__.py", line 443, in iterate_with_context
    next_output = next(iterator)
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 105, in _execute_core_compute
    for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 76, in _yield_compute_results
    for event in user_event_sequence:
  File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/definitions/decorators/solid.py", line 227, in compute
    result = fn(context, **kwargs)
  File "/home/me/.ssh/workers-python/workers/ontology_tagger/ontology_tagger/ontology_tagger_worker.py", line 136, in train
    raise RetryRequested(max_retries=max_retries)

2021-09-29 09:59:01 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - ENGINE_EVENT - Finished steps in process (pid: 12676) in 930ms
2021-09-29 09:59:01 - dagster - ERROR - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - PIPELINE_FAILURE - Execution of pipeline "train_pipeline" failed. Steps failed: ['train.compute'].

train_pipeline.yaml

local:
  intermediate_storage:
    filesystem:
      config:
        base_dir: ./
  resources:
    fqdn_string:
      config:
        fqdn: null
    mds_store:
      config:
        mds_uri: 'mongodb://root:CENSORED'
    file_cache:
      config:
        target_folder: ./
    file_manager:
      config: {}
    s3: {}
  solids:
    train:
      inputs:
        instance_type:
          value: ml.p3.2xlarge
        max_retries:
          value: 0
        s3_uri:
          value: dbpedia_business_numeric
        data:
          train: train.csv
          val: val.csv
          labels: classes.txt
        use_spot: False
        hparams: {}
prod:
  intermediate_storage:
    filesystem:
      config:
        base_dir: ./
  resources:
    fqdn_string:
      config:
        fqdn: ''
    mds_store:
      config:
        mds_uri: 'mongodb://root:CENSORED'
    file_cache:
      config:
        target_folder: ./
    file_manager:
      config: {}
    s3: {}
  solids:
    train:
      inputs:
        instance_type:
          value: ml.p3.2xlarge
        max_retries:
          value: 1
        s3_uri:
          value: dbpedia_business_numeric
        data:
          train: train.csv
          val: val.csv
          labels: classes.txt
        use_spot: True
        hparams: {}

models.yaml

dbpedia_comprehensive:
  test:
    data_local: data/test.csv
    skip_unit_test: True
  prod:
    model_s3: s3://dbpedia_comprehensive.tar.gz
    data_s3: s3://CENSORED/data6
  local:
    model_s3: s3://CENSORED/dbpedia_comprehensive.tar.gz
    data_s3: s3://CENSORED/data6
dbpedia_business:
  test:
    data_local: data/test_bu.csv
    skip_unit_test: True
  prod:
    model_s3: s3://CENSORED/dbpedia_business.tar.gz
    data_s3: s3://CENSORED/data_focused_v6
  local:
    model_s3: s3://CENSORED/dbpedia_business.tar.gz
    data_s3: s3://CENSORED/data_focused_v6
dbpedia_russian:
  test:
    data_local: data/test_ru.csv
    skip_unit_test: True
  prod:
    model_s3: s3://CENSORED/dbpedia_business_ru.tar.gz
    data_s3: s3://CENSORED/data_focused_ru
  local:
    model_s3: s3://CENSORED/dbpedia_business_ru.tar.gz
    data_s3: s3://CENSORED/data_focused_ru
dbpedia_business_numeric:
  test:
    data_local: data/test_bu.csv
    functional_test: data/real_data_test.csv
    skip_unit_test: False
    comments: real_data_test.csv covers medical, legal and generic business domains. Ontology labels were added manually, tags known to models were preferred compared to new tags. Raw data from DataProfiler available in s3://iotahoe-datascience/python_workers/dl_test/worker_data/.
  prod:
    model_s3: s3://CENSORED/dbpedia_business_numeric.tar.gz
    data_s3: s3://data_focused_v9
  local:
    model_s3: s3://CENSORED/dbpedia_business_numeric.tar.gz
    data_s3: s3://CENSORED/data_focused_v9

请让我知道是否还有其他我应该添加的内容。

4

0 回答 0