import os
from datetime import datetime
from typing import List, Optional, Sequence, Union
import json
import pickle
import typer
from clearml import Dataset, Task, TaskTypes
from clearml.automation import PipelineController
from clearml.utilities.config import config_dict_to_text
from src.main.python.pkautopilot.configuration import PipelineConfig
from src.main.python.pkautopilot.constants import *
from task1_dataset_versioning import dataset_versioning
from task2_hpo import hpo
from task3_training import train
from task4_reporting import report
cli = typer.Typer()
@cli.command()
def pipeline(
config_path: str = typer.Option(
"config_path", help="Path to the pipeline configuration yaml file."),
queue: Optional[str] = typer.Option(
"k8s_scheduler", show_default=False, help="Name of the queue in which to schedule the task"),
local: bool = typer.Option(
False, help="Run the pipeline locally (True) or in the cloud (False)")
):
# Read and connect config
pipeline_config = PipelineConfig(config_path)
config = pipeline_config.config
parsed_config = pipeline_config.parsed_config
project_name = config.meta.project_name
pipe = PipelineController(
name="phenkit-learn pipeline",
project=project_name,
target_project=True,
auto_version_bump=True,
add_pipeline_tags=False,
docker=CLEARML_AGENT_DOCKER_IMAGE,
packages="./requirements.txt",
repo="./"
)
current_task = pipe._task
pipeline_task_id = current_task.task_id
# fetch the task id of the pipeline. We save the config under the pipeline task and access it on each subtask
pipe.connect_configuration(configuration=parsed_config)
# upload config file as artefact so that child tasks can read from it
current_task.setup_aws_upload(bucket=CLEARML_BUCKET, subdir=SUBDIR_CONFIG,
key=AWS_ACCESS_KEY, secret=AWS_SECRET_ACCESS_KEY,
region=AWS_DEFAULT_REGION)
print(f'pipeline.py: uploading config for task with id {pipeline_task_id}')
current_task.upload_artifact(
"pipeline_config", artifact_object=config_path, wait_on_upload=True)
pipe.add_function_step(
name=CLEARML_PIPELINE_VERSION_DATASET,
function=dataset_versioning,
function_kwargs=dict(pipe_task_id=pipeline_task_id),
function_return=['dataset_id'],
project_name=project_name,
task_name=CLEARML_PIPELINE_VERSION_DATASET,
task_type=TaskTypes.data_processing,
packages="./requirements.txt",
docker=CLEARML_AGENT_DOCKER_IMAGE,
repo="./",
# cache_executed_step=True, # does not execute this step if nothing has changed
)...