Examples: query, "exact match", wildcard*, wild?ard, wild*rd
Fuzzy search: cake~ (finds cakes, bake)
Term boost: "red velvet"^4, chocolate^2
Field grouping: tags:(+work -"fun-stuff")
Escaping: Escape characters +-&|!(){}[]^"~*?:\ with \, e.g. \+
Range search: properties.timestamp:[1587729413488 TO *] (inclusive), properties.title:{A TO Z}(excluding A and Z)
Combinations: chocolate AND vanilla, chocolate OR vanilla, (chocolate OR vanilla) NOT "vanilla pudding"
Field search: properties.title:"The Title" AND text
Answered
Hi. We Are Building A Pipeline. For This, We Try To Get Access To Artefacts From Preceding Tasks In The Pipeline, Via Their Task Id, Like Hpo_Task = Task.Get_Task(Task_Id=Hpo_Task_Id) But These Seem Not To Work Reliably, Presumably Because It Is Not Guara

Hi. We are building a pipeline. For this, we try to get access to artefacts from preceding tasks in the pipeline, via their task id, like hpo_task = Task.get_task(task_id=hpo_task_id) But these seem not to work reliably, presumably because it is not guaranteed that "old" tasks can be revoked. Or should it? Please give me some advice. In the end, we would like to have programmatic access to task's artefacts.

  
  
Posted 3 months ago
Votes Newest

Answers 11


Hi @<1543766544847212544:profile|SorePelican79> , can you provide a sample of how this looks? The suggested method is the one in the examples:
None

  
  
Posted 3 months ago

I will add some code here soon.

  
  
Posted 3 months ago

def dataset_versioning(
pipe_task_id: str
):

import os
from clearml import Dataset, Task, TaskTypes

from src.main.python.pkautopilot.configuration import PipelineConfig
from src.main.python.pkautopilot.constants import (TRAIN_FILE_NAME, TEST_FILE_NAME, CLEARML_BUCKET,
                                                   SUBDIR_DATASETS, CLEARML_PIPELINE_VERSION_DATASET, CLEARML_DATASET_KEY,
                                                   AWS_ACCESS_KEY, SUBDIR_CONFIG, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
from src.main.python.pkautopilot.exceptions import DataFolderError
from src.main.python.pkautopilot.helpers import get_tags

# helper function to check for local data sets
def data_sets_are_available(source):
    train_set_is_available = os.path.isfile(
        os.path.join(source, TRAIN_FILE_NAME))
    test_set_is_available = os.path.isfile(
        os.path.join(source, TEST_FILE_NAME))

    return train_set_is_available and test_set_is_available

# connect to current task to setup connection to our s3 bucket
print('task1_dataset_versioning: task started')
current_task = Task.current_task()
current_task.setup_aws_upload(bucket=CLEARML_BUCKET, subdir=SUBDIR_DATASETS,
                              key=AWS_ACCESS_KEY, secret=AWS_SECRET_ACCESS_KEY,
                              region=AWS_DEFAULT_REGION)

# connect to pipeline.config, linked by pipeline task who triggered the current task
try:
    print(f'task1_versioning -accessing pipeline task: {pipe_task_id}')
    pipeline_task = Task.get_task(task_id=pipe_task_id)
except Exception:
    print(f'failed accessing pipeline task with task id {pipe_task_id}')
    exit(-1)
# Download pipeline config
config_path = pipeline_task.artifacts["pipeline_config"].get_local_copy()
pipeline_config = PipelineConfig(config_path)
config = pipeline_config.config
# parsed_config = pipeline_config.parsed_config
project_name = config.meta.project_name
dataset_project_name = config.meta.dataset_project_name
path_to_datasets = config.data.dataset_source
dataset_id = config.data.dataset_id
dataset_source = config.data.dataset_source

tags = get_tags(config.data.crops,
                config.data.countries,
                config.data.tags)

current_task.set_name(CLEARML_PIPELINE_VERSION_DATASET)
current_task.set_project(project_name=project_name)
current_task.set_task_type(TaskTypes.data_processing)
current_task.set_tags(tags)

# if local data sets are mentioned in config yaml, try to use them
if dataset_id is None:
    if not data_sets_are_available(path_to_datasets):
        raise DataFolderError()
    print('found data sets locally')
    # Check for available data for current project
    existing_datasets = Dataset.list_datasets(partial_name=dataset_project_name,
                                              dataset_project=project_name, only_completed=True)

    final_list = []
    for item in existing_datasets:
        if Dataset.get(item['id']).is_final():
            final_list.append(item)
    existing_datasets = final_list

    # append to parent data if there is some in clearml
    existing_datasets.sort(key=lambda x: x["created"])
    if existing_datasets:
        # If the dataset already exists, update parent_ids
        parent_ids = [data["id"] for data in existing_datasets]
    else:
        parent_ids = []  # None

    # Create dataset object
    dataset = Dataset.create(
        dataset_name=dataset_project_name,
        dataset_project=project_name,
        dataset_tags=tags,
        parent_datasets=parent_ids,
        output_uri="s3://" + CLEARML_DATASET_KEY
    )
    # Sync data to it and finalize
    n_removed, n_added, n_edited = dataset.sync_folder(dataset_source)
    dataset.upload(output_url="s3://" + CLEARML_DATASET_KEY)

    dataset.finalize()
    dataset_id = dataset.id
print('task1 - upload')
current_task.upload_artifact("dataset_id", artifact_object=dataset_id, wait_on_upload=True)
return dataset_id
  
  
Posted 3 months ago

I also want the subsequent tasks to access artefacts from the pipeline task (the config.yaml) as well as artefacts from prior tasks. Here and then it fails. not always

  
  
Posted 3 months ago

here comes step 1:

  
  
Posted 3 months ago

here is another failure which sometimes occurs along the pipeline: Launching step [model_training]
Launching step: model_training
Parameters:
{'kwargs/pipe_task_id': '6818423a08684b0d9972c92e5e38d6bc', 'kwargs_artifacts/dataset_id': '${version_dataset.id}.dataset_id', 'kwargs_artifacts/hpo_task_id': '${hyperparameter_optimization.id}.task_id'}
Configurations:
{}
Overrides:
{}
ClearML results page: None
Traceback (most recent call last):
File "/tmp/tmpkkna_wjn.py", line 83, in <module>
kwargs[k] = parent_task.get_parameters(cast=True)[return_section + '/' + artifact_name]
KeyError: 'return/task_id'
ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring
Launching the next 0 steps
Setting pipeline controller Task as failed (due to failed steps) !

  
  
Posted 3 months ago

Hi John. Thanks for your quick response. I looked carefully in the documentation and my use case seems to be standard for clearML. However I get this error message. clearml.Task - ERROR - Failed reloading task

  
  
Posted 3 months ago

this is the pipeline module until step1

  
  
Posted 3 months ago

very strange. Sorry for putting in so much information

  
  
Posted 3 months ago

I can also bring attachments here, if it make sense

  
  
Posted 3 months ago

import os
from datetime import datetime
from typing import List, Optional, Sequence, Union
import json
import pickle

import typer
from clearml import Dataset, Task, TaskTypes
from clearml.automation import PipelineController
from clearml.utilities.config import config_dict_to_text

from src.main.python.pkautopilot.configuration import PipelineConfig
from src.main.python.pkautopilot.constants import *
from task1_dataset_versioning import dataset_versioning
from task2_hpo import hpo
from task3_training import train
from task4_reporting import report

cli = typer.Typer()

@cli.command()
def pipeline(
config_path: str = typer.Option(
"config_path", help="Path to the pipeline configuration yaml file."),
queue: Optional[str] = typer.Option(
"k8s_scheduler", show_default=False, help="Name of the queue in which to schedule the task"),
local: bool = typer.Option(
False, help="Run the pipeline locally (True) or in the cloud (False)")

):
# Read and connect config
pipeline_config = PipelineConfig(config_path)
config = pipeline_config.config
parsed_config = pipeline_config.parsed_config

project_name = config.meta.project_name

pipe = PipelineController(
    name="phenkit-learn pipeline",
    project=project_name,
    target_project=True,
    auto_version_bump=True,
    add_pipeline_tags=False,
    docker=CLEARML_AGENT_DOCKER_IMAGE,
    packages="./requirements.txt",
    repo="./"
)

current_task = pipe._task
pipeline_task_id = current_task.task_id
# fetch the task id of the pipeline. We save the config under the pipeline task and access it on each subtask

pipe.connect_configuration(configuration=parsed_config)

# upload config file as artefact so that child tasks can read from it

current_task.setup_aws_upload(bucket=CLEARML_BUCKET, subdir=SUBDIR_CONFIG,
                              key=AWS_ACCESS_KEY, secret=AWS_SECRET_ACCESS_KEY,
                              region=AWS_DEFAULT_REGION)
print(f'pipeline.py: uploading config for task with id {pipeline_task_id}')
current_task.upload_artifact(
    "pipeline_config", artifact_object=config_path, wait_on_upload=True)

pipe.add_function_step(
    name=CLEARML_PIPELINE_VERSION_DATASET,
    function=dataset_versioning,
    function_kwargs=dict(pipe_task_id=pipeline_task_id),
    function_return=['dataset_id'],
    project_name=project_name,
    task_name=CLEARML_PIPELINE_VERSION_DATASET,
    task_type=TaskTypes.data_processing,
    packages="./requirements.txt",
    docker=CLEARML_AGENT_DOCKER_IMAGE,
    repo="./",
    # cache_executed_step=True, # does not execute this step if nothing has changed
)...
  
  
Posted 3 months ago
281 Views
11 Answers
3 months ago
3 months ago
Tags