auto_process_ngs.simple_scheduler
Python module to provide scheduler capability for running external programs
- class auto_process_ngs.simple_scheduler.SchedulerCallback(name, function, wait_for=[])
Class providing an interface to scheduled callbacks
SchedulerJob instances should normally be returned by a call to the ‘submit’ method of a SimpleScheduler object.
- invoke(jobs, sched)
Invoke the callback
- exception auto_process_ngs.simple_scheduler.SchedulerException
Base class for errors with simple scheduler code
- class auto_process_ngs.simple_scheduler.SchedulerGroup(name, group_id, parent_scheduler, log_dir=None, wait_for=None)
Class providing an interface to schedule a group of jobs
SchedulerGroup instances should normally be returned by a call to the ‘group’ method of a SimpleScheduler object. The group should be populated by calling its ‘add’ method (note that jobs are passed directly to the scheduler).
Once all jobs have been added the ‘close’ method should be invoked to indicate to the scheduler that the group is complete. At this point no more jobs can be added to the group, and the scheduler will check for when the group has finished running.
- add(args, runner=None, name=None, wd=None, log_dir=None, wait_for=None, callbacks=[])
Add a request to run a job
- Parameters:
args – the command to run expressed as a list or tuple of arguments
runner – (optional) a JobRunner instance that will be used to dispatch and control the job.
name – (optional) a name for the job. Must be unique within the scheduler instance.
wd – (optional) the working directory to execute the job in; defaults to the current working directory
log_dir – (optional) explicitly specify directory for log files
wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start
callbacks – (optional) a list or tuple of functions that will be executed when the job completes.
- Returns:
SchedulerJob instance for the added job.
- close()
Indicate that all jobs have been added to the group
- property closed
Test whether group has been closed
- property completed
Test whether group has completed
Returns True if all the jobs in the group have completed, False otherwise.
- property exit_code
Return exit code for the group
If all jobs have completed with status zero then returns zero, otherwise returns a count of jobs which have non-zero status.
If the group hasn’t completed then returns None.
- property is_running
Test whether group is running
Returns True if group was closed and if it has jobs that are still running, False if not.
- property jobs
Return list of jobs
- wait(poll_interval=5)
Wait for the group to complete
- Parameters:
poll_interval – optional, number of seconds to wait in between checking if the group has completed (default: 5 seconds)
- class auto_process_ngs.simple_scheduler.SchedulerJob(runner, args, job_number=None, name=None, working_dir=None, log_dir=None, wait_for=None)
Class providing an interface to scheduled jobs
SchedulerJob instances should normally be returned by a call to the ‘submit’ method of a SimpleScheduler object.
- property completed
Test if a job has completed
Returns True if the job has finished running, False otherwise.
- property exit_code
Return exit code from job
Wrapper for the ‘exit_status’ property provided by the ‘Job’ superclass.
- property in_error_state
Test if a job is in an error state
Returns True if the job appears to be in an error state, and False if not.
- property is_running
Test if a job is running
Returns True if the job has started and is still running, False otherwise.
- restart(max_tries=3)
Restart running the job
Attempts to restart a job, by terminating the current instance and reinvoking the ‘start’ method.
The number of times that a restart should be attempted is limited by the ‘max_tries’ parameter.
If the restart is successful then the new job id is returned; otherwise None is returned to indicate failure.
- Parameters:
max_tries (int) – maximum number of restarts that should be attempted on this job before giving up (default: 3)
- Returns:
Id for job
- start()
Start the job running
Overrides the ‘start’ method in the base ‘Job’ class.
- Returns:
Id for job
- wait(poll_interval=5, timeout=None)
Wait for the job to complete
This method blocks while waiting for the job to finish running.
NB if the job is not created by submission to a scheduler or group then it’s up to the calling subprogram to ensure that the job is started before the ‘wait’ method is invoked.
- Parameters:
poll_interval – optional, number of seconds to wait in between checking if the job has completed (default: 5 seconds)
timeout – optional, if set then is the maximum time in seconds that the job will be allowed to run before it’s terminated and a SchedulerTimeout exception is raised
- class auto_process_ngs.simple_scheduler.SchedulerReporter(fp=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, **args)
Class to report on scheduler operations
SimpleScheduler calls methods of the SchedulerReporter when jobs are scheduled, started and finished, when groups are added and finished, and to report the status of the scheduler.
The methods look for associated template format strings which are then used to create and output strings.
Templates are added and modified via the ‘set_template’ method. Templates should be Python format strings which should process a dictionary of values.
Template name Used when Available dictionary keys
- job_scheduled Job is submitted job_name, job_number,job_id,
waiting_for, time_stamp, command
job_start Job starts running as ‘job_scheduled’ job_end Job finishes as ‘job_scheduled’ group_added Group is created group_name, group_id, time_stamp group_end Group completes as ‘group_added’ scheduler_status Need status n_running, n_waiting, n_finished
An example template string for a job could be:
‘Job %(job_name)s: %(job_id)d’
The purpose of the SchedulerReporter class to provide a way to customise reporting of the standard scheduler operations when used in an application.
- group_added(group)
Write report string when a group is added
- Parameters:
group – SchedulerGroup instance
- group_end(group)
Write report string when a group ends
- Parameters:
group – SchedulerGroup instance
- job_end(job)
Write report string when a job ends
- Parameters:
job – SchedulerJob instance
- job_scheduled(job)
Write report string when a job is scheduled
- Parameters:
job – SchedulerJob instance
- job_start(job)
Write report string when a job starts
- Parameters:
job – SchedulerJob instance
- scheduler_status(sched)
Write report string for scheduler status
- Parameters:
sched – SimpleScheduler instance
- set_template(name, template)
Associate a template string with an operation
- Parameters:
name – operation name e.g. ‘job_start’
template – associated template string, or None to suppress reporting of the operation
- exception auto_process_ngs.simple_scheduler.SchedulerTimeout
Timeout limit exceeded
- class auto_process_ngs.simple_scheduler.SimpleScheduler(runner=None, reporter=None, max_concurrent=None, max_slots=None, poll_interval=5, job_interval=0.1, max_restarts=1, submission_mode=1)
Lightweight scheduler class
Class providing simple job control functionality. Requests to run external programs are submitted to the scheduler via the ‘submit’ method; the scheduler handles actually executing them.
Submitted jobs can have dependencies on earlier jobs, in which case they will not be executed until those dependencies have completed.
The scheduler runs in its own thread.
Usage:
>>> s = SimpleScheduler() >>> s.start() >>> s.submit(['fastq_screen',...],name='fastq_screen.model') >>> s.submit(['fastq_screen',...],name='fastq_screen.other') >>> s.submit(['fastq_screen',...],name='fastq_screen.rRNA') >>> s.submit(['fastqc',...],wait_for=('fastq_screen.model',...))
The number of concurrent jobs that the scheduler will run can be set directly via the ‘max_concurrent’ option; alternatively a limit can be placed on the maximum total ‘slots’ (aka cores, threads or processors) that are available to running jobs. In this case the scheduler will only start jobs when there are enough available slots to accommodate them.
- callback(name, callback, wait_for)
Add a callback function
Add a function ‘callback’ which will be invoked when all the jobs listed in ‘wait_for’ have completed.
The function is invoked as:
callback(name,(job1,…),sched)
where name is the submitted name, job is the SchedulerJob that has completed, and sched is the scheduler instance.
- Parameters:
name – a name for the callback. Must be unique within the scheduler instance; if None then a unique name will be generated.
callback – the function that will be executed when the job completes.
wait_for – a list or tuple of job and/or group names which will trigger the callback when they finish
- Returns:
SchedulerCallback instance.
- property committed_slots
Return number of slots currently committed
- property default_runner
Return the default runner
- find(pattern)
Lookup groups and jobs from regex pattern matching
Returns a list of jobs with names that match the supplied pattern (or an empty list if no matches were found).
- group(name, log_dir=None, wait_for=None, callbacks=[])
Create a group of jobs
- Parameters:
name – a name for the group. Must be unique within the scheduler instance.
wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start
log_dir – (optional) explicitly specify directory for log files
callbacks – (optional) a list or tuple of functions that will be executed when the job completes.
- Returns:
Empty SchedulerGroup instance.
- has_name(name)
Test if name has already been used
Returns True if the name already exists, False otherwise.
- is_empty()
Test if the scheduler has any jobs remaining
Returns False if there are jobs running and/or waiting, True otherwise.
- property job_number
increment and return job count
- Type:
Internal
- lookup(name)
Look up and return SchedulerJob or SchedulerGroup instance
Returns None if no matching instance was found.
- property n_finished
Return number of jobs that have completed
- property n_running
Return number of jobs currently running
- property n_waiting
Return number of jobs waiting to run
- run()
Internal: run method overriding that from base Thread class
This method implements the scheduler loop.
Don’t call this directly - it is invoked by the Thread’s ‘start’ method.
- stop()
Stop the scheduler and terminate running jobs
- submit(args, runner=None, name=None, wd=None, log_dir=None, wait_for=None, callbacks=[])
Submit a request to run a job
- Parameters:
args – the command to run expressed as a list or tuple of arguments
runner – (optional) a JobRunner instance that will be used to dispatch and control the job.
name – (optional) a name for the job. Must be unique within the scheduler instance.
wd – (optional) the working directory to execute the job in; defaults to the current working directory
log_dir – (optional) explicitly specify directory for log files
wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start
callbacks – (optional) a list or tuple of functions that will be executed when the job completes.
- Returns:
SchedulerJob instance for the submitted job.
- wait()
Wait until the scheduler is empty
- wait_for(names, timeout=None)
Wait until the named jobs/groups have completed
- Parameters:
names – a list or tuple of job and/or group names which the scheduler will wait to complete
timeout – optional, if set then is the maximum time in seconds that the job will be allowed to run before it’s terminated and a SchedulerTimeout exception is raised
- auto_process_ngs.simple_scheduler.cleanup_atexit(sched)
Perform clean up actions on exit
Stops the scheduler which should kill any running jobs
- auto_process_ngs.simple_scheduler.date_and_time(epoch=None)
Return formatted date and time information
- auto_process_ngs.simple_scheduler.default_scheduler_reporter()
Return a default SchedulerReporter object