I see, something like:from mystandalone import my_func_that_also_calls_task_init def task_factory(): task = Task.create(project="my_project", name="my_experiment", script="main_script.py", add_task_init_call=False) return task
if the pipeline and the my_func_that_also_calls_task_init
are in the same repo, this should actually work.
You can quickly test this pipeline withpipe = Pipelinecontroller() pipe.add_step(preprocess, ...) pipe.add_step(base_task_factory=task_factory, ...) pipe.add_step(postprocess, ...) pipe.start_locally(run_pipeline_steps_locally=True)
Make sense ?
Specific step in the pipeline. The main step (the experiment) is currently just a file with a Task.init
` and then the experiment code. I am wondering how to modify this code such that it can be run in the pipeline or as standalone.
ReassuredTiger98 are you saying you want to be able to run the pipeline as a standalone and as "remote pipeline",
Or is this for a specific step in the pipeline that you want to be able to run standalone/pipelined ?
Maybe something like this is how it is intended to be used?
` # run_with_clearml.py
def get_main_task():
task = Task.create(project="my_project", name="my_experiment", script="main_script.py")
return task
def run_standalone(task_factory):
Task.enqueue(task_factory())
def run_in_pipeline(task_factory):
pipe = Pipelinecontroller()
pipe.add_step(preprocess, ...)
pipe.add_step(base_task_factory=task_factory, ...)
pipe.add_step(postprocess, ...)
pipe.start()
if args.in_pipeline:
run_in_pipeline(get_main_task)
else:
run_standalone(get_main_task) `
Maybe let s put it in a different way:
Pipeline
Preprocess Task Main Task Postprocess Task
My main task is my experiment, so my training code. When I ran the main task standalone, I just used Task.init
and set up the project name, task name, etc.
Now what I could do is push this task to the server, then just reference the task by its task-ID and run the pipeline. However, I do not want to push the main task to the server before running. Instead I want to push the whole pipeline, but still be able to run my main task standalone. So in the end, whether I run the task as part of a pipeline or as standalone should not show different on the clearml-server.
you mean The Task already exists or you want to create a Task from the code ?
By preexisting task I meant I have existing code that already uses Task.init
. I would like to use this code as my main task in my pipeline, i.e. after carla started.
Is there a way to capture uncommited changes with
Task.create
just like
Task.init
does? Actually, I would like to populate the repo, branch and packages automatically...
You can pass a local repo path to Task create I "think" it will also store the uncommitted changes.
I start my main task like this:
python my_script.py --myarg "myargs"
. How are the arguments captured?
At runtime when argparse is called.
You can use clearml-task
CLI to take an existing code and create a Task from it:
https://clear.ml/docs/latest/docs/apps/clearml_task
https://github.com/allegroai/clearml/blob/master/docs/clearml-task.md
I am also wondering how I integrate my (preexisting) main task in the pipeline. I start my main task like this: python my_script.py --myarg "myargs"
. How are the arguments captured? I am very confused, how one integrates this correctly...
Is there a way to capture uncommited changes with Task.create
just like Task.init
does? Actually, I would like to populate the repo, branch and packages automatically...
LOL I keep typing clara without noticing (maybe it's the nvidia thing I keep thinking about)
Carla makes much more sense 😄
Thank you, good to know!
(btw: the simulator is called carla, not clara :))
ReassuredTiger98 I ❤ the DAG in ASCII!!!
port = task_carla_server.get_parameter("General/port")
This looks great! and will acheive exactly what you are after.
BTW: when you are done you can do :task_carla_server.mark_aborted(force=True)
And it will shutdown the Clara Task 🙂
Here is how my start_carla .py task looks like currently:
` import os
import subprocess
from time import sleep
from clearml import Task
from clearml.config import running_remotely
def create_task(node):
task = Task.create(
project_name="examples",
task_name="start-carla",
repo="myrepo",
branch="carla-clearml-integration",
script="src/start_carla_task.py",
working_directory="src",
packages=["clearml"],
add_task_init_call=False,
)
return task
if name == "main":
task = Task.init(project_name="examples", task_name="start-carla", task_type="application")
task_carla_server_blueprint = Task.get_task(project_name="examples", task_name="carla-server")
task_carla_server = Task.clone(source_task=task_carla_server_blueprint, name="carla-server")
Task.enqueue(task_carla_server, queue_name="carla")
port = None
while not port:
task_carla_server.reload()
port = task_carla_server.get_parameter("General/port")
sleep(1)
task.set_parameter("General/server_task_id", task_carla_server.id)
task.set_parameter("General/port", port) `
So with pipeline decorators can I implement this logic?
Actually, my current approach looks like this:
carla-server-task : Launch carla server instance on a random port, set the port as param and then block the task/process, so I can kill carla when this task is aborted. This task keeps running the whole time.
start-carla-task : Launch a carla-server-task and wait for the port parameter to be set. Set the launched carla-server-task task-id and the port as param. Set task completed.
main-task : Run experiment when all start-carla-task are finished with carla-servers running on the ports specified in the start-carla-task params.
stop-carla-task : Get the task-id from the start-carla-task and programmtically abort the carla-server-task
Since i want to be able to start multiple carla-servers my pipeline looks like this:
pipeline
/ \
start-carla-task -1 start-carla-task -2 -----> These manually start carla-server-task-1 and carla-server-task-2 and block until they got a port back.
|\ /|
| main-task |
|/ |
stop-carla-task -1 stop-carla-task -2
\ /
finished
(can you see my diagram?)
Thank you. Yes we need to wait for carla to spin up.
Hi ReassuredTiger98
I think you should have something like:
` @PipelineDecorator.component(task_type=TaskTypes.application, docker='clara_docker_container_if_we_need')
def step_one(param):
print('step_one')
import os
os.system('run me clara')
# I'm assuming we should wait?
return
@PipelineDecorator.component(task_type=TaskTypes.training)
def step_two(param):
print('step_two')
import something
somthing.to_do()
return
@PipelineDecorator.pipeline(name='custom pipeline logic', project='examples', version='0.0.5')
def executing_pipeline():
print('pipeline logic strating')
print('launch step one')
step_one('clara args?!')
print('launch step two')
step_two('do something)
if name == 'main':
PipelineDecorator.set_default_execution_queue('default')
# Start the pipeline execution logic.
executing_pipeline()
print('process completed') `Do we need to wait for clara to spin for step two ?
Notice the entire code (including two functions is stored on the pipeline Task itself, the step itself (i.e. the function) is then package as its own standalone Task, including reference to the docker image if needed, and required python packages based on what's imported inside the function)
Another example on what I would expect:
` ### start_carla.py
def get_task():
task = Task.init(project_name="examples", task_name="start-carla", task_type="application")
# experiment is not run here. The experiment is only run when this is executed as standalone or on a clearml-agent.
return task
def run_experiment(task):
...
This task can also be run as standalone or run by a clearml-agent
if name == "main":
task = get_task()
run_experiment(task)
run_pipeline.py
from start_carla import get_task as get_start_carla_task
if name == "main":
pipe = PipelineController(
name="carla-autostart",
project="rlad/carla-servers",
version="0.0.1",
add_pipeline_tags=False,
)
pipe.add_step(name="start-carla-1", base_task_factory=get_start_carla_task)
pipe.add_step(name="start-carla-2", base_task_factory=get_start_carla_task)
pipe.start() `
Okay, I see. Unfortunetly, I don't get how clearml tasks are intended to be used. Could you help me with that? (see code)
` def start_carla_factory():
task = # How do I create this task?
long_blocking_call_to_start_carla()
return task
pipe = PipelineController(
name="carla-autostart",
project="rlad/carla-servers",
version="0.0.1",
add_pipeline_tags=False,
)
pipe.add_step(name="start-carla", base_task_factory=start_carla_factory)
pipe.start() `
Runtime, every time the add_step needs to create a New Task to be enqueued
When is the base_task_factory called? At runtime or definition time?
my experiment logic
you mean the actual code doing the training ?
so that it gets lazily executed and not at task definition time
Task definition time -> when creating the Pipeline Task? remember the base_task_factory a the end creates a Task object (it does not run the code itslef).
BTW: if you have simple training logic you can use pipeline decorators , it might be a better fit?
https://clear.ml/docs/latest/docs/fundamentals/pipelines#pipeline-from-function-decorator
I am wondering where to put my experiment logic, so that it gets lazily executed and not at task definition time (i.e. in get_task_experiment()
how to get my experiment logic in there without running it)
Wow, thank you very much. And how would I bind my code to task?
you mean the code that creates pipeline Tasks ?
(remember the pipeline itself is a Task in the system, basically if your pipeline code is a single script it will pack the entire thing )
Wow, thank you very much. And how would I bind my code to task? Should I still use Task.init
and it will just use the file it is called in as entrypoint or should I create a task using Task.create
and specify the script?
Hi ReassuredTiger98
but I would rather just define a function that returns the task directly
🙂
Check it out:
https://github.com/allegroai/clearml/blob/36ee3d61209e413a917d8a718fb25f389143cfa1/clearml/automation/controller.py#L205:param base_task_factory: Optional, instead of providing a pre-existing Task, provide a Callable function to create the Task (returns Task object)
Also here is how I run my experiments right now, so I can execute them locally and remotely:
` # Initialize ClearML Task
task = (
Task.init(
project_name="examples",
task_name=args.name,
output_uri=True,
)
if track_remote or enqueue
else None
)
# Execute remotly via CLearML
if enqueue is not None and not running_remotely():
if enqueue == "None":
queue_name = None
task.reset()
exit()
else:
queue_name = enqueue
task.execute_remotely(queue_name, clone=False, exit_process=True) `
Here is some context on what I am currently trying to do (pseudocode):
`
def run_experiment(args):
...
def get_task_experiment():
task = Task.init(...)
task.bind_run(run_experiment)
return task
def run_with_pipeline(task):
pipe = PipelineController(...)
pipe.add_step(prepare_something...)
pipe.add_step(task)
pipe.add_step(postprocess_something...)
return pipe
if name == "main":
task = get_task_experiment()
# Run without Pipeline
if not args.pipeline:
task.execute_remotely("my_queue", clone=False, exit_process=True)
else:
pipe = run_with_pipeline(task)
pipe.start() `
Can you tell me how I create tasks correctly? The PipelineController.add_step
takes the task-id/task-name, but I would rather just define a function that returns the task directly, since the base-task may not be already on the clearml-server.