I think that clearml should be able to do parameter sweeps using pipelines in a manner that makes use of parallelisation.
Use the HPO, it is basically doing the same thing with some more sophisticated algorithm (HBOB):
https://github.com/allegroai/clearml/blob/master/examples/optimization/hyper-parameter-optimization/hyper_parameter_optimizer.py
For example - how would this task-based example be done with pipelines?
Sure, you could do something like:
` from clearml import PipelineDecorator
@PipelineDecorator.component(cache=True)
def create_dataset(source_url: str, project: str, dataset_name: str) -> str:
print("starting create_dataset")
from clearml import StorageManager, Dataset
import pandas as pd
local_file = StorageManager.get_local_copy(source_url)
df = pd.read_csv(local_file, header=None)
df.to_csv(path_or_buf="./dataset.csv", index=False)
dataset = Dataset.create(dataset_project=project, dataset_name=dataset_name)
dataset.add_files("./dataset.csv")
dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head())
dataset.finalize(auto_upload=True)
print("done create_dataset")
return dataset.id
@PipelineDecorator.component(cache=True)
def preprocess_dataset(dataset_id: str):
print("starting preprocess_dataset")
from clearml import Dataset
from pathlib import Path
import pandas as pd
dataset = Dataset.get(dataset_id=dataset_id)
local_folder = dataset.get_local_copy()
df = pd.read_csv(Path(local_folder) / "dataset.csv", header=None)
# "preprocessing" - adding columns
df.columns = [
'age', 'workclass', 'fnlwgt', 'degree', 'education-yrs', 'marital-status',
'occupation', 'relationship', 'ethnicity', 'gender', 'capital-gain',
'capital-loss', 'hours-per-week', 'native-country', 'income-cls',
]
df.to_csv(path_or_buf="./dataset.csv", index=False)
# store in a new dataset
new_dataset = Dataset.create(
dataset_project=dataset.project, dataset_name="{} v2".format(dataset.name),
parent_datasets=[dataset]
)
new_dataset.add_files("./dataset.csv")
new_dataset.get_logger().report_table(title="sample", series="head", table_plot=df.head())
new_dataset.finalize(auto_upload=True)
print("done preprocess_dataset")
return new_dataset.id
@PipelineDecorator.component(cache=True)
def verify_dataset_integrity(dataset_id: str, expected_num_columns: int):
print("starting verify_dataset_integrity")
from clearml import Dataset, Logger
from pathlib import Path
import numpy as np
import pandas as pd
dataset = Dataset.get(dataset_id=dataset_id)
local_folder = dataset.get_local_copy()
df = pd.read_csv(Path(local_folder) / "dataset.csv")
print("Verifying dataset")
assert len(df.columns) == expected_num_columns
print("PASSED")
# log some stats on the age column
Logger.current_logger().report_histogram(
title="histogram", series="age", values=np.histogram(df["age"])
)
print("done verify_dataset_integrity")
return True
@PipelineDecorator.component()
def train_model(dataset_id: str, training_args: dict):
print("starting train_model")
from clearml import Dataset, OutputModel, Task
from pathlib import Path
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
dataset = Dataset.get(dataset_id=dataset_id)
local_folder = dataset.get_local_copy()
df = pd.read_csv(Path(local_folder) / "dataset.csv")
# prepare data (i.e. select specific columns)
columns = ["age", "fnlwgt", "education-yrs", "capital-gain", "capital-loss", "hours-per-week"]
X = df[columns].drop("age", axis=1)
y = df["age"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# create matrix
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
# train with XGBoost
params = {"objective": "reg:squarederror", "eval_metric": "rmse"}
bst = xgb.train(
params,
dtrain,
num_boost_round=training_args.get("num_boost_round", 100),
evals=[(dtrain, "train"), (dtest, "test")],
verbose_eval=0,
)
# evaluate
y_pred = bst.predict(dtest)
plt.plot(y_test, 'r')
plt.plot(y_pred, 'b')
# let's store the eval score
error = np.linalg.norm(y_test-y_pred)
bst.save_model("a_model.xgb")
Task.current_task().reload()
model_id = Task.current_task().models['output'][-1].id
print("done train_model")
return dict(error=error, model_id=model_id)
@PipelineDecorator.component(monitor_models=["best"])
def select_best_model(models_score: list):
print("starting select_best_model")
from clearml import OutputModel, Task
best_model = None
for m in models_score:
if not best_model or m["error"] < best_model["error"]:
best_model = m
print("The best model is {}".format(best_model))
# lets store it on the pipeline
best_model = OutputModel(base_model_id=best_model["model_id"])
# let's make sure we have it
best_model.connect(task=Task.current_task(), name="best")
print("done select_best_model")
return best_model.id
@PipelineDecorator.pipeline(
name='xgboost_pipeline',
project='xgboost_pipe_demo',
version='0.1'
)
def pipeline(data_url: str, project: str):
dataset_id = create_dataset(source_url=data_url, project=project, dataset_name="mock")
preprocessed_dataset_id = preprocess_dataset(dataset_id=dataset_id)
if not bool(verify_dataset_integrity(
dataset_id=preprocessed_dataset_id,
expected_num_columns=15)
):
print("Verification Failed!")
return False
print("start training models")
models_score = []
for i in [100, 150]:
model_score = train_model(
dataset_id=preprocessed_dataset_id, training_args=dict(num_boost_round=i)
)
models_score.append(model_score)
model_id = select_best_model(models_score=models_score)
print("selected model_id = {}".format(model_id))
if name == 'main':
url = " "
PipelineDecorator.run_locally()
pipeline(data_url=url, project="xgboost_pipe_demo") `