actually, re-running pipeline_from_decorator.py
a second time (and a third time) from the command line seem to have executed without the that ValueError so maybe that issue was some fluke.
Nevertheless, those runs exit prior to lineprint('process completed')
and I would definitely prefer the command executing_pipeline
to not kill the process that called it.
For example, maybe, having started the pipeline I'd like my code to also report having started the pipeline to some other tool (e.g. send me a slack message about it)
I think that clearml should be able to do parameter sweeps using pipelines in a manner that makes use of parallelisation.
Use the HPO, it is basically doing the same thing with some more sophisticated algorithm (HBOB):
https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py
For example - how would this task-based example be done with pipelines?
Sure, you could do something like:
` from clearml import PipelineDecorator
@PipelineDecorator.component(cache=True)
def create_dataset(source_url: str, project: str, dataset_name: str) -> str:
print("starting create_dataset")
from clearml import StorageManager, Dataset
import pandas as pd
local_file = StorageManager.get_local_copy(source_url)
df = pd.read_csv(local_file, header=None)
df.to_csv(path_or_buf="./dataset.csv", index=False)
dataset = Dataset.create(dataset_project=project, dataset_name=dataset_name)
dataset.add_files("./dataset.csv")
dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head())
dataset.finalize(auto_upload=True)
print("done create_dataset")
return dataset.id
@PipelineDecorator.component(cache=True)
def preprocess_dataset(dataset_id: str):
print("starting preprocess_dataset")
from clearml import Dataset
from pathlib import Path
import pandas as pd
dataset = Dataset.get(dataset_id=dataset_id)
local_folder = dataset.get_local_copy()
df = pd.read_csv(Path(local_folder) / "dataset.csv", header=None)
# "preprocessing" - adding columns
df.columns = [
'age', 'workclass', 'fnlwgt', 'degree', 'education-yrs', 'marital-status',
'occupation', 'relationship', 'ethnicity', 'gender', 'capital-gain',
'capital-loss', 'hours-per-week', 'native-country', 'income-cls',
]
df.to_csv(path_or_buf="./dataset.csv", index=False)
# store in a new dataset
new_dataset = Dataset.create(
dataset_project=dataset.project, dataset_name="{} v2".format(dataset.name),
parent_datasets=[dataset]
)
new_dataset.add_files("./dataset.csv")
new_dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head())
new_dataset.finalize(auto_upload=True)
print("done preprocess_dataset")
return new_dataset.id
@PipelineDecorator.component(cache=True)
def verify_dataset_integrity(dataset_id: str, expected_num_columns: int):
print("starting verify_dataset_integrity")
from clearml import Dataset, Logger
from pathlib import Path
import numpy as np
import pandas as pd
dataset = Dataset.get(dataset_id=dataset_id)
local_folder = dataset.get_local_copy()
df = pd.read_csv(Path(local_folder) / "dataset.csv")
print("Verifying dataset")
assert len(df.columns) == expected_num_columns
print("PASSED")
# log some stats on the age column
Logger.current_logger().report_histogram(
title="histogram", series="age", values=np.histogram(df["age"])
)
print("done verify_dataset_integrity")
return True
@PipelineDecorator.component()
def train_model(dataset_id: str, training_args: dict):
print("starting train_model")
from clearml import Dataset, OutputModel, Task
from pathlib import Path
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
dataset = Dataset.get(dataset_id=dataset_id)
local_folder = dataset.get_local_copy()
df = pd.read_csv(Path(local_folder) / "dataset.csv")
# prepare data (i.e. select specific columns)
columns = ["age", "fnlwgt", "education-yrs", "capital-gain", "capital-loss", "hours-per-week"]
X = df[columns].drop("age", axis=1)
y = df["age"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# create matrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
# train with XGBoost
params = {"objective": "reg:squarederror", "eval_metric": "rmse"}
bst = xgb.train(
params,
dtrain,
num_boost_round=training_args.get("num_boost_round", 100),
evals=[(dtrain, "train"), (dtest, "test")],
verbose_eval=0,
)
# evaluate
y_pred = bst.predict(dtest)
plt.plot(y_test, 'r')
plt.plot(y_pred, 'b')
# let's store the eval score
error = np.linalg.norm(y_test-y_pred)
bst.save_model("a_model.xgb")
Task.current_task().reload()
model_id = Task.current_task().models['output'][-1].id
print("done train_model")
return dict(error=error, model_id=model_id)
@PipelineDecorator.component(monitor_models=["best"])
def select_best_model(models_score: list):
print("starting select_best_model")
from clearml import OutputModel, Task
best_model = None
for m in models_score:
if not best_model or m["error"] < best_model["error"]:
best_model = m
print("The best model is {}".format(best_model))
# lets store it on the pipeline
best_model = OutputModel(base_model_id=best_model["model_id"])
# let's make sure we have it
best_model.connect(task=Task.current_task(), name="best")
print("done select_best_model")
return best_model.id
@PipelineDecorator.pipeline(
name='xgboost_pipeline',
project='xgboost_pipe_demo',
version='0.1'
)
def pipeline(data_url: str, project: str):
dataset_id = create_dataset(source_url=data_url, project=project, dataset_name="mock")
preprocessed_dataset_id = preprocess_dataset(dataset_id=dataset_id)
if not bool(verify_dataset_integrity(
dataset_id=preprocessed_dataset_id,
expected_num_columns=15)
):
print("Verification Failed!")
return False
print("start training models")
models_score = []
for i in [100, 150]:
model_score = train_model(
dataset_id=preprocessed_dataset_id, training_args=dict(num_boost_round=i)
)
models_score.append(model_score)
model_id = select_best_model(models_score=models_score)
print("selected model_id = {}".format(model_id))
if name == 'main':
url = " "
PipelineDecorator.run_locally()
pipeline(data_url=url, project="xgboost_pipe_demo") `
first, thanks for having these discussions. I appreciate this kind of support is an effort 🙏
Yes. i perfectly understand that once a pipeline job (or a task) is sent off in this manner, it executes separately (and, most likely in a different machine) from the process that instantiated it.
I still feel strongly that such a command should not be thought of as a fire and exit operation. I can think of several scenarios where continued execution of the instantiating process is desired:
I want pipeline / task dispatch to be reported and monitored outside of clearml. For example, I might want to log the dispatch event in some non-clearml system and then monitor the health of the pipeline and alert if if it is pending for too long. I might want to dispatch other jobs from within the same process. pipeline dispatch might be part of a ci/cd task that is triggered on code changes related to the pipeline. It dispatches the pipeline and verifies that expected artifacts were produced within some time limits.I realise that there may be ways around this exit call or existing clearml mechanisms that support performing some of the example use-cases I mentioned here but IMO - leave the decision up to the developer and don't kill their process.
What I think would be preferable is that the pipeline be deployed and that the python process that deployed it were allowed to continue on to whatever I had planned for it to do next (i.e. not exit)
Hi PanickyMoth78PipelineDecorator.set_default_execution_queue('default')
Would close the current process and launch the pipeline logic on the "serices" queue. Which means the local process is being terminated (specifically in your case the notebook kernel). Does that make sense ?
If you want the pipeline logic to stay on the local machine you can say:@PipelineDecorator.pipeline(..., pipeline_execution_queue=None)
If I run from terminal, I see:ValueError: Task object can only be updated if created or in_progress [status=stopped fields=['configuration']]
Hmm interesting, so like a callback?!
like https://github.com/allegroai/clearml/blob/bca9a6de3095f411ae5b766d00967535a13e8401/examples/pipeline/pipeline_from_tasks.py#L54-L55 pipe-step level callbacks? I guess that mechanism could serve. Where do these callbacks run? In the instantiating process? If so, that would work (since the callback function can be any code I wish, right?)
I might want to dispatch other jobs from within the same process.
This is actually something that you should not do with decorators as pipelines.
Is that also the case when https://github.com/allegroai/clearml/blob/bca9a6de3095f411ae5b766d00967535a13e8401/examples/pipeline/pipeline_from_functions.py ? i.e. that main should not go on to making another pipeline?
pipeline dispatch might be part of a ci/cd task
Then it should be clone-pipeline->enqueue into "services" queue -> wait for result?
I suppose that if pipeline clone + enqueue results in a run with the latest code / container images then that would serve the purpose I had in mind.
I'm still unclear on why the script needs to end execution after dispatch. It doesn't seem like the natural choice there but it sounds like there are ways of accomplishing the use cases I've thought up.
I want pipeline / task dispatch to be reported and monitored outside of clearml. For example, I might want to log the dispatch event in some non-clearml system and then monitor the health of the pipeline and alert if if it is pending for too long.Hmm interesting, so like a callback?!
I'm thinking a callback is being executed after the Pipelines is sent, but once the callback is done, the pipeline process leaves?
Does that make sense ?
I might want to dispatch other jobs from within the same process.This is actually something that you should not do with decorators as pipelines. Since the logic of the entire pipeline, including execution is the entire script, not just the pipeline decorated function. We might be able to change that, but the current implementation assumes the entire script is the entrypoint for the remote pipeline execution (not just the pipeline decorated function). The idea is that you might need to have some preconfiguration before. But I'm not actually sure we still need to support that.
pipeline dispatch might be part of a ci/cd task that is triggered on code changes related to the pipeline. It dispatches the pipeline and verifies that expected artifacts were produced within some time limits.Then it should be clone-pipeline->enqueue into "services" queue -> wait for result?
Hi Martin. See that ValueError
https://clearml.slack.com/archives/CTK20V944/p1657583310364049?thread_ts=1657582739.354619&cid=CTK20V944 Perhaps something else is going on?
Thanks ! 🎉
I'll give it a try.
I think that clearml should be able to do parameter sweeps using pipelines in a manner that makes use of parallelisation.
If that's not happening with the new RC, I wonder how I would do a parameter sweep within the pipelines framework.
For example - how would this task-based example be done with pipelines?
https://github.com/allegroai/clearml/blob/master/examples/automation/manual_random_param_search_example.py
I'm thinking of a case where you want to try some options for data preparation steps and for each of those - some options for model hyper parameters.
To me that's a DAG that should use pipelines
I'm on clearml 1.6.2
The jupyter notebook service and two clear-ml agents ( version1.3.0, one in queue "default" and one in queue "services" and with --cpu-only flag) ) are all running inside a docker container
PanickyMoth78
and I would definitely prefer the command
executing_pipeline
to
not
kill
the process that called it.
I understand why it would be odd from a notebook perspective, the issue is that the actual code is being "sent" to the backend to be execcuted on a remote machine. It is important to understand, that this is the end of the current process. Does that make sense ?
(not saying we could not add an argument for that, just trying to make sure I convey the design decision before discussing an improvement)
Hi PanickyMoth78 , an RC is out with a fix.
pip install clearml==1.6.3rc0
Thank you for noticing the graph issue.
Btw do notice that since data is being changed inside the controller loop the parents are still kind of odd, because it is not clear to the logic the source of the data so it assumes it depends on the current state (i.e. all the leaves)
Thanks for the fix and the mock HPO example code !
Pipeline behaviour with the fix is looking good.
I see the point about changes to data inside the controller possibly causing dependencies for step 3 (or, at least, making it harder for the interpreter to know).