Hi all, I have a question regarding TaskScheduler
. When used for scheduling a function, schedule_function
, is there a way to pass function arguments? I see in the TaskScheduler
class that there is the func_args
variable:
def _launch_job_function(self, job, func_args=None):
# type: (BaseScheduleJob, Optional[Sequence]) -> Optional[Thread]
# make sure this IS a function job
if not job.base_function:
return None
# check if this is a single instance, then we need to abort the Task
if job.single_instance and job.get_last_executed_task_id():
# noinspection PyBroadException
try:
a_thread = [t for t in enumerate_threads() if t.ident == job.get_last_executed_task_id()]
if a_thread:
a_thread = a_thread[0]
except Exception:
a_thread = None
if a_thread and a_thread.is_alive():
self._log(
"Skipping Task '{}' scheduling, previous Thread instance '{}' still running".format(
job.name, a_thread.ident
)
)
job.run(None)
return None
self._log("Scheduling Job '{}', Task '{}' on background thread".format(job.name, job.base_function))
t = Thread(target=job.base_function, args=func_args or ())
t.start()
# mark as run
job.run(t.ident)
return t
Is there a way for me to pass arguments? Something like this:
scheduler.add_task(name='yolo_vids_pipe',
schedule_function=clone_enqueue_yolo_vids_pipe,
func_args={arg_1:'something', arg_2:'something'},
day=1, # Launch every day
hour=6, # at 6:00 am
minute=00,
recurring=True,
execute_immediately=True,
)