auto_process_ngs.simple_scheduler

Python module to provide scheduler capability for running external programs

class auto_process_ngs.simple_scheduler.SchedulerCallback(name, function, wait_for=[])

Class providing an interface to scheduled callbacks

SchedulerJob instances should normally be returned by a call to the ‘submit’ method of a SimpleScheduler object.

invoke(jobs, sched)

Invoke the callback

exception auto_process_ngs.simple_scheduler.SchedulerException

Base class for errors with simple scheduler code

class auto_process_ngs.simple_scheduler.SchedulerGroup(name, group_id, parent_scheduler, log_dir=None, wait_for=None)

Class providing an interface to schedule a group of jobs

SchedulerGroup instances should normally be returned by a call to the ‘group’ method of a SimpleScheduler object. The group should be populated by calling its ‘add’ method (note that jobs are passed directly to the scheduler).

Once all jobs have been added the ‘close’ method should be invoked to indicate to the scheduler that the group is complete. At this point no more jobs can be added to the group, and the scheduler will check for when the group has finished running.

add(args, runner=None, name=None, wd=None, log_dir=None, wait_for=None, callbacks=[])

Add a request to run a job

Parameters:
  • args – the command to run expressed as a list or tuple of arguments

  • runner – (optional) a JobRunner instance that will be used to dispatch and control the job.

  • name – (optional) a name for the job. Must be unique within the scheduler instance.

  • wd – (optional) the working directory to execute the job in; defaults to the current working directory

  • log_dir – (optional) explicitly specify directory for log files

  • wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start

  • callbacks – (optional) a list or tuple of functions that will be executed when the job completes.

Returns:

SchedulerJob instance for the added job.

close()

Indicate that all jobs have been added to the group

property closed

Test whether group has been closed

property completed

Test whether group has completed

Returns True if all the jobs in the group have completed, False otherwise.

property exit_code

Return exit code for the group

If all jobs have completed with status zero then returns zero, otherwise returns a count of jobs which have non-zero status.

If the group hasn’t completed then returns None.

property is_running

Test whether group is running

Returns True if group was closed and if it has jobs that are still running, False if not.

property jobs

Return list of jobs

wait(poll_interval=5)

Wait for the group to complete

Parameters:

poll_interval – optional, number of seconds to wait in between checking if the group has completed (default: 5 seconds)

class auto_process_ngs.simple_scheduler.SchedulerJob(runner, args, job_number=None, name=None, working_dir=None, log_dir=None, wait_for=None)

Class providing an interface to scheduled jobs

SchedulerJob instances should normally be returned by a call to the ‘submit’ method of a SimpleScheduler object.

property completed

Test if a job has completed

Returns True if the job has finished running, False otherwise.

property exit_code

Return exit code from job

Wrapper for the ‘exit_status’ property provided by the ‘Job’ superclass.

property in_error_state

Test if a job is in an error state

Returns True if the job appears to be in an error state, and False if not.

property is_running

Test if a job is running

Returns True if the job has started and is still running, False otherwise.

restart(max_tries=3)

Restart running the job

Attempts to restart a job, by terminating the current instance and reinvoking the ‘start’ method.

The number of times that a restart should be attempted is limited by the ‘max_tries’ parameter.

If the restart is successful then the new job id is returned; otherwise None is returned to indicate failure.

Parameters:

max_tries (int) – maximum number of restarts that should be attempted on this job before giving up (default: 3)

Returns:

Id for job

start()

Start the job running

Overrides the ‘start’ method in the base ‘Job’ class.

Returns:

Id for job

wait(poll_interval=5, timeout=None)

Wait for the job to complete

This method blocks while waiting for the job to finish running.

NB if the job is not created by submission to a scheduler or group then it’s up to the calling subprogram to ensure that the job is started before the ‘wait’ method is invoked.

Parameters:
  • poll_interval – optional, number of seconds to wait in between checking if the job has completed (default: 5 seconds)

  • timeout – optional, if set then is the maximum time in seconds that the job will be allowed to run before it’s terminated and a SchedulerTimeout exception is raised

class auto_process_ngs.simple_scheduler.SchedulerReporter(fp=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, **args)

Class to report on scheduler operations

SimpleScheduler calls methods of the SchedulerReporter when jobs are scheduled, started and finished, when groups are added and finished, and to report the status of the scheduler.

The methods look for associated template format strings which are then used to create and output strings.

Templates are added and modified via the ‘set_template’ method. Templates should be Python format strings which should process a dictionary of values.

Template name Used when Available dictionary keys

job_scheduled Job is submitted job_name, job_number,job_id,

waiting_for, time_stamp, command

job_start Job starts running as ‘job_scheduled’ job_end Job finishes as ‘job_scheduled’ group_added Group is created group_name, group_id, time_stamp group_end Group completes as ‘group_added’ scheduler_status Need status n_running, n_waiting, n_finished

An example template string for a job could be:

‘Job %(job_name)s: %(job_id)d’

The purpose of the SchedulerReporter class to provide a way to customise reporting of the standard scheduler operations when used in an application.

group_added(group)

Write report string when a group is added

Parameters:

group – SchedulerGroup instance

group_end(group)

Write report string when a group ends

Parameters:

group – SchedulerGroup instance

job_end(job)

Write report string when a job ends

Parameters:

job – SchedulerJob instance

job_scheduled(job)

Write report string when a job is scheduled

Parameters:

job – SchedulerJob instance

job_start(job)

Write report string when a job starts

Parameters:

job – SchedulerJob instance

scheduler_status(sched)

Write report string for scheduler status

Parameters:

sched – SimpleScheduler instance

set_template(name, template)

Associate a template string with an operation

Parameters:
  • name – operation name e.g. ‘job_start’

  • template – associated template string, or None to suppress reporting of the operation

exception auto_process_ngs.simple_scheduler.SchedulerTimeout

Timeout limit exceeded

class auto_process_ngs.simple_scheduler.SimpleScheduler(runner=None, reporter=None, max_concurrent=None, max_slots=None, poll_interval=5, job_interval=0.1, max_restarts=1, submission_mode=1)

Lightweight scheduler class

Class providing simple job control functionality. Requests to run external programs are submitted to the scheduler via the ‘submit’ method; the scheduler handles actually executing them.

Submitted jobs can have dependencies on earlier jobs, in which case they will not be executed until those dependencies have completed.

The scheduler runs in its own thread.

Usage:

>>> s = SimpleScheduler()
>>> s.start()
>>> s.submit(['fastq_screen',...],name='fastq_screen.model')
>>> s.submit(['fastq_screen',...],name='fastq_screen.other')
>>> s.submit(['fastq_screen',...],name='fastq_screen.rRNA')
>>> s.submit(['fastqc',...],wait_for=('fastq_screen.model',...))

The number of concurrent jobs that the scheduler will run can be set directly via the ‘max_concurrent’ option; alternatively a limit can be placed on the maximum total ‘slots’ (aka cores, threads or processors) that are available to running jobs. In this case the scheduler will only start jobs when there are enough available slots to accommodate them.

callback(name, callback, wait_for)

Add a callback function

Add a function ‘callback’ which will be invoked when all the jobs listed in ‘wait_for’ have completed.

The function is invoked as:

callback(name,(job1,…),sched)

where name is the submitted name, job is the SchedulerJob that has completed, and sched is the scheduler instance.

Parameters:
  • name – a name for the callback. Must be unique within the scheduler instance; if None then a unique name will be generated.

  • callback – the function that will be executed when the job completes.

  • wait_for – a list or tuple of job and/or group names which will trigger the callback when they finish

Returns:

SchedulerCallback instance.

property committed_slots

Return number of slots currently committed

property default_runner

Return the default runner

find(pattern)

Lookup groups and jobs from regex pattern matching

Returns a list of jobs with names that match the supplied pattern (or an empty list if no matches were found).

group(name, log_dir=None, wait_for=None, callbacks=[])

Create a group of jobs

Parameters:
  • name – a name for the group. Must be unique within the scheduler instance.

  • wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start

  • log_dir – (optional) explicitly specify directory for log files

  • callbacks – (optional) a list or tuple of functions that will be executed when the job completes.

Returns:

Empty SchedulerGroup instance.

has_name(name)

Test if name has already been used

Returns True if the name already exists, False otherwise.

is_empty()

Test if the scheduler has any jobs remaining

Returns False if there are jobs running and/or waiting, True otherwise.

property job_number

increment and return job count

Type:

Internal

lookup(name)

Look up and return SchedulerJob or SchedulerGroup instance

Returns None if no matching instance was found.

property n_finished

Return number of jobs that have completed

property n_running

Return number of jobs currently running

property n_waiting

Return number of jobs waiting to run

run()

Internal: run method overriding that from base Thread class

This method implements the scheduler loop.

Don’t call this directly - it is invoked by the Thread’s ‘start’ method.

stop()

Stop the scheduler and terminate running jobs

submit(args, runner=None, name=None, wd=None, log_dir=None, wait_for=None, callbacks=[])

Submit a request to run a job

Parameters:
  • args – the command to run expressed as a list or tuple of arguments

  • runner – (optional) a JobRunner instance that will be used to dispatch and control the job.

  • name – (optional) a name for the job. Must be unique within the scheduler instance.

  • wd – (optional) the working directory to execute the job in; defaults to the current working directory

  • log_dir – (optional) explicitly specify directory for log files

  • wait_for – (optional) a list or tuple of job and/or group names which must finish before this job can start

  • callbacks – (optional) a list or tuple of functions that will be executed when the job completes.

Returns:

SchedulerJob instance for the submitted job.

wait()

Wait until the scheduler is empty

wait_for(names, timeout=None)

Wait until the named jobs/groups have completed

Parameters:
  • names – a list or tuple of job and/or group names which the scheduler will wait to complete

  • timeout – optional, if set then is the maximum time in seconds that the job will be allowed to run before it’s terminated and a SchedulerTimeout exception is raised

auto_process_ngs.simple_scheduler.cleanup_atexit(sched)

Perform clean up actions on exit

Stops the scheduler which should kill any running jobs

auto_process_ngs.simple_scheduler.date_and_time(epoch=None)

Return formatted date and time information

auto_process_ngs.simple_scheduler.default_scheduler_reporter()

Return a default SchedulerReporter object