s3_uri
没问题 MODELS.keys()
,因为它不sys.exit()
存在:
if s3_uri not in MODELS.keys(): ...
但是,该estimator.fit(s3_data, wait, logs=True)
函数不适合s3_data
,其中s3_data = s3_uri
。
我已经将这两个配置文件都包含在了它们的整体中:train_pipeline.yaml
和models.yaml
.
print('s3_data: ', s3_data)
print('wait: ', wait)
>>>
s3_data: s3://iotahoe-datascience/python_workers/dbpedia/data_focused_v9
wait: True
Python脚本:
import glob
import json
import os
import pathlib
import sys
import tarfile
from time import time
import click
import pandas as pd
import toml
import yaml
from dagster import (Any, Bool, DagsterInstance, Dict, InputDefinition, Int,
Output, OutputDefinition, RetryRequested, String,
execute_pipeline, pipeline, reconstructable, solid)
from ontology_tagger.modules.s3_util import S3Util
from ontology_tagger.modules.serve import model_fn, predict_fn
from ontology_tagger.modules.utils import TextSerDes, metric_definitions, get_numeric_ratio
from pwmf.pipeline.utils import get_config, local_mode, prod_mode
from sagemaker.local import LocalSession
from sagemaker.pytorch import PyTorch, PyTorchModel
_path = pathlib.Path(__file__).parent.absolute()
DATA_DIR = os.getenv('DATA_PATH', default=_path / 'data')
try:
with open(_path.parent / 'pyproject.toml') as fh:
f = toml.load(fh)
TORCH_VERSION = f['tool']['poetry']['dev-dependencies']['torch']
except Exception as e:
print(f'Cannot load torch version from pyproject.toml {e}')
try:
with open(_path / 'models.yaml') as fh:
MODELS = yaml.safe_load(fh)
MODEL_NAMES = ' '.join(list(MODELS.keys()))
except Exception as e:
print(f'Cannot load models.yaml {e}')
role = 'arn:aws:iam::069192305883:role/service-role/AmazonSageMaker-ExecutionRole-20210105T082937'
@solid(
input_defs=[
InputDefinition('data', Dict),
InputDefinition('hparams', Dict),
InputDefinition('s3_uri', String),
InputDefinition('instance_type', String),
InputDefinition('max_retries', Int),
InputDefinition('use_spot', Bool),
],
output_defs=[OutputDefinition(Any, 'estimator')],
)
def train(context, hparams, instance_type, s3_uri, max_retries, data, use_spot):
sm_localcheckpoint_dir = '/opt/ml/checkpoints/'
train_max_run_secs = 2 * 24 * 60 * 60
spot_wait_sec = 5 * 60
max_wait_time_secs = train_max_run_secs + spot_wait_sec
model_uri = None
if not use_spot:
max_wait_time_secs = None
if s3_uri not in MODELS.keys():
context.log.error(f'Data {s3_uri} is not available.')
context.log.error(f'Available datasets registered with {MODEL_NAMES}')
sys.exit(1)
else:
try:
s3_uri = MODELS[s3_uri][context.mode_def.name]['data_s3']
context.log.info(f'Using dataset {s3_uri}')
except KeyError as e:
context.log.error(f'Cannot find key {e} in models.yaml')
sys.exit(1)
s3_data = s3_uri
s3_uri, _ = os.path.split(s3_uri)
s3_output_path = f'{s3_uri}/output'
s3_code_path = f'{s3_uri}/code'
s3_checkpoint_path = f'{s3_uri}/checkpoint/'
wait = True
if instance_type.startswith('local'):
use_spot = False
max_wait_time_secs = 0
wait = True
sm_localcheckpoint_dir = None
sagemaker_session = LocalSession()
sagemaker_session.config = {'local': {'local_code': True}}
hp = {
'checkpointdir': sm_localcheckpoint_dir,
'train_file': data['train'],
'validate_file': data['val'],
'labels': data['labels'],
}
if instance_type.startswith('ml.p3.') or instance_type.startswith('ml.g4dn.'):
hp['gpu'] = 1
if hparams:
hp.update(hparams)
if 'model_uri' in hparams:
model_uri = hparams['model_uri']
estimator = PyTorch(
entry_point='main.py',
source_dir='modules',
role=role,
framework_version=TORCH_VERSION,
py_version='py3',
instance_count=1,
instance_type=instance_type,
hyperparameters=hp,
output_path=s3_output_path,
metric_definitions=metric_definitions,
volume_size=30,
code_location=s3_code_path,
debugger_hook_config=False,
base_job_name='ontologytagger',
use_spot_instances=use_spot,
max_run=train_max_run_secs,
max_wait=max_wait_time_secs,
checkpoint_local_path=sm_localcheckpoint_dir,
checkpoint_s3_uri=s3_checkpoint_path,
model_uri=model_uri,
subnets=['subnet-991feef2', 'subnet-deceffa4'],
security_group_ids=['sg-db39d4b1'],
)
try:
print('s3_data: ', s3_data)
print('wait: ', wait)
estimator.fit(s3_data, wait, logs=True)
except Exception as e:
print('Exception: ', e)
raise RetryRequested(max_retries=max_retries) # ERROR - line 136
...
追溯:
me@LAPTOP-G1DAPU88:~/.ssh/workers-python/workers/ontology_tagger/ontology_tagger$ poetry run python ontology_tagger_worker.py --pipeline train_pipeline
2021-09-29 09:59:00,005::DEBUG::dev::<module>::initializing logger with "dev" as runtime mode
2021-09-29 09:59:00,016::DEBUG::dev::<module>::initializing logger with "dev" as runtime mode
2021-09-29 09:59:00,020::DEBUG::dev::<module>::initializing logger with "dev" as runtime mode
/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/definitions/mode.py:66: UserWarning: system_storage_defs are deprecated and will be removed in 0.10.0 and should be replaced with intermediate_storage_defs for intermediates and resource_defs for files
warnings.warn(
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - ENGINE_EVENT - Starting initialization of resources [asset_store].
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - ENGINE_EVENT - Finished initialization of resources [asset_store].
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - PIPELINE_START - Started execution of pipeline "train_pipeline".
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - ENGINE_EVENT - Executing steps in process (pid: 12676)
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_START - Started execution of step "train.compute".
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "data" of type "dict". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "hparams" of type "dict". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "s3_uri" of type "String". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "instance_type" of type "String". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "max_retries" of type "Int". (Type check passed).
2021-09-29 09:59:00 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_INPUT - Got input "use_spot" of type "Bool". (Type check passed).
2021-09-29 09:59:00 - dagster - INFO - system - 94408359-492b-4103-95b2-ed70249a2e15 - train.compute - Using dataset s3://iotahoe-datascience/python_workers/dbpedia/data_focused_v9
s3_data: s3://iotahoe-datascience/python_workers/dbpedia/data_focused_v9
wait: True
Exception: Failed to upload /tmp/tmpj4otjedb/source.tar.gz to iotahoe-datascience/python_workers/dbpedia/code/ontologytagger-2021-09-29-08-59-00-692/source/sourcedir.tar.gz: An error occurred (RequestTimeTooSkewed) when calling the PutObject operation: The difference between the request time and the current time is too large.
2021-09-29 09:59:01 - dagster - ERROR - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - train.compute - STEP_FAILURE - Execution of step "train.compute" failed.
Exceeded max_retries of 0
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 212, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 286, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 59, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 475, in _user_event_sequence_for_step_compute_fn
for event in iterate_with_context(raise_interrupts_immediately, gen):
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/utils/__init__.py", line 443, in iterate_with_context
next_output = next(iterator)
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 105, in _execute_core_compute
for step_output in _yield_compute_results(compute_context, inputs, compute_fn):
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 76, in _yield_compute_results
for event in user_event_sequence:
File "/home/me/.cache/pypoetry/virtualenvs/ontology-tagger-Zvh6gzFu-py3.8/lib/python3.8/site-packages/dagster/core/definitions/decorators/solid.py", line 227, in compute
result = fn(context, **kwargs)
File "/home/me/.ssh/workers-python/workers/ontology_tagger/ontology_tagger/ontology_tagger_worker.py", line 136, in train
raise RetryRequested(max_retries=max_retries)
2021-09-29 09:59:01 - dagster - DEBUG - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - ENGINE_EVENT - Finished steps in process (pid: 12676) in 930ms
2021-09-29 09:59:01 - dagster - ERROR - train_pipeline - 94408359-492b-4103-95b2-ed70249a2e15 - 12676 - PIPELINE_FAILURE - Execution of pipeline "train_pipeline" failed. Steps failed: ['train.compute'].
train_pipeline.yaml
:
local:
intermediate_storage:
filesystem:
config:
base_dir: ./
resources:
fqdn_string:
config:
fqdn: null
mds_store:
config:
mds_uri: 'mongodb://root:CENSORED'
file_cache:
config:
target_folder: ./
file_manager:
config: {}
s3: {}
solids:
train:
inputs:
instance_type:
value: ml.p3.2xlarge
max_retries:
value: 0
s3_uri:
value: dbpedia_business_numeric
data:
train: train.csv
val: val.csv
labels: classes.txt
use_spot: False
hparams: {}
prod:
intermediate_storage:
filesystem:
config:
base_dir: ./
resources:
fqdn_string:
config:
fqdn: ''
mds_store:
config:
mds_uri: 'mongodb://root:CENSORED'
file_cache:
config:
target_folder: ./
file_manager:
config: {}
s3: {}
solids:
train:
inputs:
instance_type:
value: ml.p3.2xlarge
max_retries:
value: 1
s3_uri:
value: dbpedia_business_numeric
data:
train: train.csv
val: val.csv
labels: classes.txt
use_spot: True
hparams: {}
models.yaml
:
dbpedia_comprehensive:
test:
data_local: data/test.csv
skip_unit_test: True
prod:
model_s3: s3://dbpedia_comprehensive.tar.gz
data_s3: s3://CENSORED/data6
local:
model_s3: s3://CENSORED/dbpedia_comprehensive.tar.gz
data_s3: s3://CENSORED/data6
dbpedia_business:
test:
data_local: data/test_bu.csv
skip_unit_test: True
prod:
model_s3: s3://CENSORED/dbpedia_business.tar.gz
data_s3: s3://CENSORED/data_focused_v6
local:
model_s3: s3://CENSORED/dbpedia_business.tar.gz
data_s3: s3://CENSORED/data_focused_v6
dbpedia_russian:
test:
data_local: data/test_ru.csv
skip_unit_test: True
prod:
model_s3: s3://CENSORED/dbpedia_business_ru.tar.gz
data_s3: s3://CENSORED/data_focused_ru
local:
model_s3: s3://CENSORED/dbpedia_business_ru.tar.gz
data_s3: s3://CENSORED/data_focused_ru
dbpedia_business_numeric:
test:
data_local: data/test_bu.csv
functional_test: data/real_data_test.csv
skip_unit_test: False
comments: real_data_test.csv covers medical, legal and generic business domains. Ontology labels were added manually, tags known to models were preferred compared to new tags. Raw data from DataProfiler available in s3://iotahoe-datascience/python_workers/dl_test/worker_data/.
prod:
model_s3: s3://CENSORED/dbpedia_business_numeric.tar.gz
data_s3: s3://data_focused_v9
local:
model_s3: s3://CENSORED/dbpedia_business_numeric.tar.gz
data_s3: s3://CENSORED/data_focused_v9
请让我知道是否还有其他我应该添加的内容。