def dataset_versioning(
pipe_task_id: str
):
import os
from clearml import Dataset, Task, TaskTypes
from src.main.python.pkautopilot.configuration import PipelineConfig
from src.main.python.pkautopilot.constants import (TRAIN_FILE_NAME, TEST_FILE_NAME, CLEARML_BUCKET,
SUBDIR_DATASETS, CLEARML_PIPELINE_VERSION_DATASET, CLEARML_DATASET_KEY,
AWS_ACCESS_KEY, SUBDIR_CONFIG, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
from src.main.python.pkautopilot.exceptions import DataFolderError
from src.main.python.pkautopilot.helpers import get_tags
# helper function to check for local data sets
def data_sets_are_available(source):
train_set_is_available = os.path.isfile(
os.path.join(source, TRAIN_FILE_NAME))
test_set_is_available = os.path.isfile(
os.path.join(source, TEST_FILE_NAME))
return train_set_is_available and test_set_is_available
# connect to current task to setup connection to our s3 bucket
print('task1_dataset_versioning: task started')
current_task = Task.current_task()
current_task.setup_aws_upload(bucket=CLEARML_BUCKET, subdir=SUBDIR_DATASETS,
key=AWS_ACCESS_KEY, secret=AWS_SECRET_ACCESS_KEY,
region=AWS_DEFAULT_REGION)
# connect to pipeline.config, linked by pipeline task who triggered the current task
try:
print(f'task1_versioning -accessing pipeline task: {pipe_task_id}')
pipeline_task = Task.get_task(task_id=pipe_task_id)
except Exception:
print(f'failed accessing pipeline task with task id {pipe_task_id}')
exit(-1)
# Download pipeline config
config_path = pipeline_task.artifacts["pipeline_config"].get_local_copy()
pipeline_config = PipelineConfig(config_path)
config = pipeline_config.config
# parsed_config = pipeline_config.parsed_config
project_name = config.meta.project_name
dataset_project_name = config.meta.dataset_project_name
path_to_datasets = config.data.dataset_source
dataset_id = config.data.dataset_id
dataset_source = config.data.dataset_source
tags = get_tags(config.data.crops,
config.data.countries,
config.data.tags)
current_task.set_name(CLEARML_PIPELINE_VERSION_DATASET)
current_task.set_project(project_name=project_name)
current_task.set_task_type(TaskTypes.data_processing)
current_task.set_tags(tags)
# if local data sets are mentioned in config yaml, try to use them
if dataset_id is None:
if not data_sets_are_available(path_to_datasets):
raise DataFolderError()
print('found data sets locally')
# Check for available data for current project
existing_datasets = Dataset.list_datasets(partial_name=dataset_project_name,
dataset_project=project_name, only_completed=True)
final_list = []
for item in existing_datasets:
if Dataset.get(item['id']).is_final():
final_list.append(item)
existing_datasets = final_list
# append to parent data if there is some in clearml
existing_datasets.sort(key=lambda x: x["created"])
if existing_datasets:
# If the dataset already exists, update parent_ids
parent_ids = [data["id"] for data in existing_datasets]
else:
parent_ids = [] # None
# Create dataset object
dataset = Dataset.create(
dataset_name=dataset_project_name,
dataset_project=project_name,
dataset_tags=tags,
parent_datasets=parent_ids,
output_uri="s3://" + CLEARML_DATASET_KEY
)
# Sync data to it and finalize
n_removed, n_added, n_edited = dataset.sync_folder(dataset_source)
dataset.upload(output_url="s3://" + CLEARML_DATASET_KEY)
dataset.finalize()
dataset_id = dataset.id
print('task1 - upload')
current_task.upload_artifact("dataset_id", artifact_object=dataset_id, wait_on_upload=True)
return dataset_id
I can also bring attachments here, if it make sense
Hi John. Thanks for your quick response. I looked carefully in the documentation and my use case seems to be standard for clearML. However I get this error message. clearml.Task - ERROR - Failed reloading task
import os
from datetime import datetime
from typing import List, Optional, Sequence, Union
import json
import pickle
import typer
from clearml import Dataset, Task, TaskTypes
from clearml.automation import PipelineController
from clearml.utilities.config import config_dict_to_text
from src.main.python.pkautopilot.configuration import PipelineConfig
from src.main.python.pkautopilot.constants import *
from task1_dataset_versioning import dataset_versioning
from task2_hpo import hpo
from task3_training import train
from task4_reporting import report
cli = typer.Typer()
@cli.command()
def pipeline(
config_path: str = typer.Option(
"config_path", help="Path to the pipeline configuration yaml file."),
queue: Optional[str] = typer.Option(
"k8s_scheduler", show_default=False, help="Name of the queue in which to schedule the task"),
local: bool = typer.Option(
False, help="Run the pipeline locally (True) or in the cloud (False)")
):
# Read and connect config
pipeline_config = PipelineConfig(config_path)
config = pipeline_config.config
parsed_config = pipeline_config.parsed_config
project_name = config.meta.project_name
pipe = PipelineController(
name="phenkit-learn pipeline",
project=project_name,
target_project=True,
auto_version_bump=True,
add_pipeline_tags=False,
docker=CLEARML_AGENT_DOCKER_IMAGE,
packages="./requirements.txt",
repo="./"
)
current_task = pipe._task
pipeline_task_id = current_task.task_id
# fetch the task id of the pipeline. We save the config under the pipeline task and access it on each subtask
pipe.connect_configuration(configuration=parsed_config)
# upload config file as artefact so that child tasks can read from it
current_task.setup_aws_upload(bucket=CLEARML_BUCKET, subdir=SUBDIR_CONFIG,
key=AWS_ACCESS_KEY, secret=AWS_SECRET_ACCESS_KEY,
region=AWS_DEFAULT_REGION)
print(f'pipeline.py: uploading config for task with id {pipeline_task_id}')
current_task.upload_artifact(
"pipeline_config", artifact_object=config_path, wait_on_upload=True)
pipe.add_function_step(
name=CLEARML_PIPELINE_VERSION_DATASET,
function=dataset_versioning,
function_kwargs=dict(pipe_task_id=pipeline_task_id),
function_return=['dataset_id'],
project_name=project_name,
task_name=CLEARML_PIPELINE_VERSION_DATASET,
task_type=TaskTypes.data_processing,
packages="./requirements.txt",
docker=CLEARML_AGENT_DOCKER_IMAGE,
repo="./",
# cache_executed_step=True, # does not execute this step if nothing has changed
)...
I also want the subsequent tasks to access artefacts from the pipeline task (the config.yaml) as well as artefacts from prior tasks. Here and then it fails. not always
here is another failure which sometimes occurs along the pipeline: Launching step [model_training]
Launching step: model_training
Parameters:
{'kwargs/pipe_task_id': '6818423a08684b0d9972c92e5e38d6bc', 'kwargs_artifacts/dataset_id': '${version_dataset.id}.dataset_id', 'kwargs_artifacts/hpo_task_id': '${hyperparameter_optimization.id}.task_id'}
Configurations:
{}
Overrides:
{}
ClearML results page: None
Traceback (most recent call last):
File "/tmp/tmpkkna_wjn.py", line 83, in <module>
kwargs[k] = parent_task.get_parameters(cast=True)[return_section + '/' + artifact_name]
KeyError: 'return/task_id'
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring
Launching the next 0 steps
Setting pipeline controller Task as failed (due to failed steps) !
very strange. Sorry for putting in so much information
Hi @<1543766544847212544:profile|SorePelican79> , can you provide a sample of how this looks? The suggested method is the one in the examples:
None