auto_process_ngs.simple_scheduler
Python module to provide scheduler capability for running external programs
- class auto_process_ngs.simple_scheduler.SchedulerCallback(name, function, wait_for=[])
Class providing an interface to scheduled callbacks
SchedulerJob instances should normally be returned by a call to the ‘submit’ method of a SimpleScheduler object.
- invoke(jobs, sched)
Invoke the callback
- exception auto_process_ngs.simple_scheduler.SchedulerException
Base class for errors with simple scheduler code
- class auto_process_ngs.simple_scheduler.SchedulerGroup(name, group_id, parent_scheduler, log_dir=None, wait_for=None)
Class providing an interface to schedule a group of jobs
SchedulerGroup instances should normally be returned by a call to the ‘group’ method of a SimpleScheduler object. The group should be populated by calling its ‘add’ method (note that jobs are passed directly to the scheduler).
Once all jobs have been added the ‘close’ method should be invoked to indicate to the scheduler that the group is complete. At this point no more jobs can be added to the group, and the scheduler will check for when the group has finished running.
- add(args, runner=None, name=None, wd=None, log_dir=None, wait_for=None, callbacks=[])
Add a request to run a job
- Parameters:
args – the command to run expressed as a list or tuple of arguments
runner – (optional) a JobRunner instance that will be used to dispatch and control the job.
name – (optional) a name for the job. Must be unique within the scheduler instance.
wd – (optional) the working directory to execute the job in; defaults to the current working directory
log_dir – (optional) explicitly specify directory for log files
wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start
callbacks – (optional) a list or tuple of functions that will be executed when the job completes.
- Returns:
SchedulerJob instance for the added job.
- close()
Indicate that all jobs have been added to the group
- property closed
Test whether group has been closed
- property completed
Test whether group has completed
Returns True if all the jobs in the group have completed, False otherwise.
- property exit_code
Return exit code for the group
If all jobs have completed with status zero then returns zero, otherwise returns a count of jobs which have non-zero status.
If the group hasn’t completed then returns None.
- property is_running
Test whether group is running
Returns True if group was closed and if it has jobs that are still running, False if not.
- property jobs
Return list of jobs
- wait(poll_interval=5)
Wait for the group to complete
- Parameters:
poll_interval – optional, number of seconds to wait in between checking if the group has completed (default: 5 seconds)
- class auto_process_ngs.simple_scheduler.SchedulerJob(runner, args, job_number=None, name=None, working_dir=None, log_dir=None, wait_for=None)
Class providing an interface to scheduled jobs
Set up a job by creating a new instance specifying the job runner and command. Optionally name, job number, working directory and log directory can also be specified, and the job can be forced to wait for one or more other jobs to complete.
The job is started by invoking the ‘start’ method; its status can be checked with the ‘is_running’ method, and terminated and restarted using the ‘terminate’ and ‘restart’ methods respectively.
Information about the job can also be accessed via its properties:
job_number Externally assigned job number job_name Name assigned to job job_id Id for the job assigned by the runner command Command to run args Arguments to supply to the command working_dir Working directory to run jobs in log_dir Directory to write log files to log Log (stdout) file for the job err Error log (stderr) file for the job name Alias for ‘job_name’ start_time Start time (seconds since the epoch) end_time End time (seconds since the epoch) exit_status Exit code from the command that was run (integer)
The Job class uses a JobRunner instance (which supplies the necessary methods for starting, stopping and monitoring) for low-level job interactions.
SchedulerJob instances should normally be returned by a call to the ‘submit’ method of a SimpleScheduler object.
- Parameters:
runner (JobRunner) – a JobRunner instance supplying job control methods
args (list) – command and arguments to run
job_number (int) – assigns external job number (optional)
name (str) – explicitly assigns job name (optional, default is to use the command)
working_dir (str) – directory to run the script in (optional, defaults to CWD)
log_dir (str) – directory to write log files to (optional)
wait_for (list) – list of SchedulerJobs that must complete before this one can start
- property completed
Test if a job has completed
Returns True if the job has finished running, False otherwise.
- property exit_code
Return exit code from job
Returns the integer exit code from the job if it has finished running, None otherwise.
- property in_error_state
Test if a job is in an error state
Returns True if the job appears to be in an error state, and False if not.
- property is_running
Check if a job is running
Returns True if the job has started and is still running, False otherwise.
- property name
Alias for ‘job_name’
- restart(max_tries=3)
Restart a running job
Attempts to restart a job, by terminating the current instance and reinvoking the ‘start’ method.
The number of times that a restart should be attempted is limited by the ‘max_tries’ parameter.
If the restart is successful then the new job id is returned; otherwise None is returned to indicate failure.
- Parameters:
max_tries (int) – maximum number of restarts that should be attempted on this job before giving up (default: 3)
- Returns:
Id for job
- property runner
Return the associated job runner instance
- start()
Start the job running
- Returns:
Id for job
- terminate()
Terminate a running job
- wait(poll_interval=5, timeout=None)
Wait for the job to complete
This method blocks while waiting for the job to finish running.
NB if the job is not created by submission to a scheduler or group then it’s up to the calling subprogram to ensure that the job is started before the ‘wait’ method is invoked.
- Parameters:
poll_interval – optional, number of seconds to wait in between checking if the job has completed (default: 5 seconds)
timeout – optional, if set then is the maximum time in seconds that the job will be allowed to run before it’s terminated and a SchedulerTimeout exception is raised
- class auto_process_ngs.simple_scheduler.SchedulerReporter(fp=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, **args)
Class to report on scheduler operations
SimpleScheduler calls methods of the SchedulerReporter when jobs are scheduled, started and finished, when groups are added and finished, and to report the status of the scheduler.
The methods look for associated template format strings which are then used to create and output strings.
Templates are added and modified via the ‘set_template’ method. Templates should be Python format strings which should process a dictionary of values.
Template name Used when Available dictionary keys
- job_scheduled Job is submitted job_name, job_number,job_id,
waiting_for, time_stamp, command
job_start Job starts running as ‘job_scheduled’ job_end Job finishes as ‘job_scheduled’ group_added Group is created group_name, group_id, time_stamp group_end Group completes as ‘group_added’ scheduler_status Need status n_running, n_waiting, n_finished
An example template string for a job could be:
‘Job %(job_name)s: %(job_id)d’
The purpose of the SchedulerReporter class to provide a way to customise reporting of the standard scheduler operations when used in an application.
- group_added(group)
Write report string when a group is added
- Parameters:
group – SchedulerGroup instance
- group_end(group)
Write report string when a group ends
- Parameters:
group – SchedulerGroup instance
- job_end(job)
Write report string when a job ends
- Parameters:
job – SchedulerJob instance
- job_scheduled(job)
Write report string when a job is scheduled
- Parameters:
job – SchedulerJob instance
- job_start(job)
Write report string when a job starts
- Parameters:
job – SchedulerJob instance
- scheduler_status(sched)
Write report string for scheduler status
- Parameters:
sched – SimpleScheduler instance
- set_template(name, template)
Associate a template string with an operation
- Parameters:
name – operation name e.g. ‘job_start’
template – associated template string, or None to suppress reporting of the operation
- exception auto_process_ngs.simple_scheduler.SchedulerTimeout
Timeout limit exceeded
- class auto_process_ngs.simple_scheduler.SimpleScheduler(runner=None, reporter=None, max_concurrent=None, max_slots=None, poll_interval=5, job_interval=0.1, max_restarts=1, submission_mode=1)
Lightweight scheduler class
Class providing simple job control functionality. Requests to run external programs are submitted to the scheduler via the ‘submit’ method; the scheduler handles actually executing them.
Submitted jobs can have dependencies on earlier jobs, in which case they will not be executed until those dependencies have completed.
The scheduler runs in its own thread.
Usage:
>>> s = SimpleScheduler() >>> s.start() >>> s.submit(['fastq_screen',...],name='fastq_screen.model') >>> s.submit(['fastq_screen',...],name='fastq_screen.other') >>> s.submit(['fastq_screen',...],name='fastq_screen.rRNA') >>> s.submit(['fastqc',...],wait_for=('fastq_screen.model',...))
The number of concurrent jobs that the scheduler will run can be set directly via the ‘max_concurrent’ option; alternatively a limit can be placed on the maximum total ‘slots’ (aka cores, threads or processors) that are available to running jobs. In this case the scheduler will only start jobs when there are enough available slots to accommodate them.
- callback(name, callback, wait_for)
Add a callback function
Add a function ‘callback’ which will be invoked when all the jobs listed in ‘wait_for’ have completed.
The function is invoked as:
callback(name,(job1,…),sched)
where name is the submitted name, job is the SchedulerJob that has completed, and sched is the scheduler instance.
- Parameters:
name – a name for the callback. Must be unique within the scheduler instance; if None then a unique name will be generated.
callback – the function that will be executed when the job completes.
wait_for – a list or tuple of job and/or group names which will trigger the callback when they finish
- Returns:
SchedulerCallback instance.
- property committed_slots
Return number of slots currently committed
- property default_runner
Return the default runner
- find(pattern)
Lookup groups and jobs from regex pattern matching
Returns a list of jobs with names that match the supplied pattern (or an empty list if no matches were found).
- group(name, log_dir=None, wait_for=None, callbacks=[])
Create a group of jobs
- Parameters:
name – a name for the group. Must be unique within the scheduler instance.
wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start
log_dir – (optional) explicitly specify directory for log files
callbacks – (optional) a list or tuple of functions that will be executed when the job completes.
- Returns:
Empty SchedulerGroup instance.
- has_name(name)
Test if name has already been used
Returns True if the name already exists, False otherwise.
- is_empty()
Test if the scheduler has any jobs remaining
Returns False if there are jobs running and/or waiting, True otherwise.
- property job_number
increment and return job count
- Type:
Internal
- lookup(name)
Look up and return SchedulerJob or SchedulerGroup instance
Returns None if no matching instance was found.
- property n_finished
Return number of jobs that have completed
- property n_running
Return number of jobs currently running
- property n_waiting
Return number of jobs waiting to run
- run()
Internal: run method overriding that from base Thread class
This method implements the scheduler loop.
Don’t call this directly - it is invoked by the Thread’s ‘start’ method.
- stop()
Stop the scheduler and terminate running jobs
- submit(args, runner=None, name=None, wd=None, log_dir=None, wait_for=None, callbacks=[])
Submit a request to run a job
- Parameters:
args – the command to run expressed as a list or tuple of arguments
runner – (optional) a JobRunner instance that will be used to dispatch and control the job.
name – (optional) a name for the job. Must be unique within the scheduler instance.
wd – (optional) the working directory to execute the job in; defaults to the current working directory
log_dir – (optional) explicitly specify directory for log files
wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start
callbacks – (optional) a list or tuple of functions that will be executed when the job completes.
- Returns:
SchedulerJob instance for the submitted job.
- wait()
Wait until the scheduler is empty
- wait_for(names, timeout=None)
Wait until the named jobs/groups have completed
- Parameters:
names – a list or tuple of job and/or group names which the scheduler will wait to complete
timeout – optional, if set then is the maximum time in seconds that the job will be allowed to run before it’s terminated and a SchedulerTimeout exception is raised
- auto_process_ngs.simple_scheduler.cleanup_atexit(sched)
Perform clean up actions on exit
Stops the scheduler which should kill any running jobs
- auto_process_ngs.simple_scheduler.date_and_time(epoch=None)
Return formatted date and time information
- auto_process_ngs.simple_scheduler.default_scheduler_reporter()
Return a default SchedulerReporter object