Unanswered
Hi. We Are Building A Pipeline. For This, We Try To Get Access To Artefacts From Preceding Tasks In The Pipeline, Via Their Task Id, Like Hpo_Task = Task.Get_Task(Task_Id=Hpo_Task_Id) But These Seem Not To Work Reliably, Presumably Because It Is Not Guara
def dataset_versioning(
pipe_task_id: str
):
import os
from clearml import Dataset, Task, TaskTypes
from src.main.python.pkautopilot.configuration import PipelineConfig
from src.main.python.pkautopilot.constants import (TRAIN_FILE_NAME, TEST_FILE_NAME, CLEARML_BUCKET,
SUBDIR_DATASETS, CLEARML_PIPELINE_VERSION_DATASET, CLEARML_DATASET_KEY,
AWS_ACCESS_KEY, SUBDIR_CONFIG, AWS_SECRET_ACCESS_KEY, AWS_DEFAULT_REGION)
from src.main.python.pkautopilot.exceptions import DataFolderError
from src.main.python.pkautopilot.helpers import get_tags
# helper function to check for local data sets
def data_sets_are_available(source):
train_set_is_available = os.path.isfile(
os.path.join(source, TRAIN_FILE_NAME))
test_set_is_available = os.path.isfile(
os.path.join(source, TEST_FILE_NAME))
return train_set_is_available and test_set_is_available
# connect to current task to setup connection to our s3 bucket
print('task1_dataset_versioning: task started')
current_task = Task.current_task()
current_task.setup_aws_upload(bucket=CLEARML_BUCKET, subdir=SUBDIR_DATASETS,
key=AWS_ACCESS_KEY, secret=AWS_SECRET_ACCESS_KEY,
region=AWS_DEFAULT_REGION)
# connect to pipeline.config, linked by pipeline task who triggered the current task
try:
print(f'task1_versioning -accessing pipeline task: {pipe_task_id}')
pipeline_task = Task.get_task(task_id=pipe_task_id)
except Exception:
print(f'failed accessing pipeline task with task id {pipe_task_id}')
exit(-1)
# Download pipeline config
config_path = pipeline_task.artifacts["pipeline_config"].get_local_copy()
pipeline_config = PipelineConfig(config_path)
config = pipeline_config.config
# parsed_config = pipeline_config.parsed_config
project_name = config.meta.project_name
dataset_project_name = config.meta.dataset_project_name
path_to_datasets = config.data.dataset_source
dataset_id = config.data.dataset_id
dataset_source = config.data.dataset_source
tags = get_tags(config.data.crops,
config.data.countries,
config.data.tags)
current_task.set_name(CLEARML_PIPELINE_VERSION_DATASET)
current_task.set_project(project_name=project_name)
current_task.set_task_type(TaskTypes.data_processing)
current_task.set_tags(tags)
# if local data sets are mentioned in config yaml, try to use them
if dataset_id is None:
if not data_sets_are_available(path_to_datasets):
raise DataFolderError()
print('found data sets locally')
# Check for available data for current project
existing_datasets = Dataset.list_datasets(partial_name=dataset_project_name,
dataset_project=project_name, only_completed=True)
final_list = []
for item in existing_datasets:
if Dataset.get(item['id']).is_final():
final_list.append(item)
existing_datasets = final_list
# append to parent data if there is some in clearml
existing_datasets.sort(key=lambda x: x["created"])
if existing_datasets:
# If the dataset already exists, update parent_ids
parent_ids = [data["id"] for data in existing_datasets]
else:
parent_ids = [] # None
# Create dataset object
dataset = Dataset.create(
dataset_name=dataset_project_name,
dataset_project=project_name,
dataset_tags=tags,
parent_datasets=parent_ids,
output_uri="s3://" + CLEARML_DATASET_KEY
)
# Sync data to it and finalize
n_removed, n_added, n_edited = dataset.sync_folder(dataset_source)
dataset.upload(output_url="s3://" + CLEARML_DATASET_KEY)
dataset.finalize()
dataset_id = dataset.id
print('task1 - upload')
current_task.upload_artifact("dataset_id", artifact_object=dataset_id, wait_on_upload=True)
return dataset_id
118 Views
0
Answers
11 months ago
11 months ago