auto_process_ngs.pipeliner
Module providing utility classes and functions for building simple ‘pipelines’ of tasks.
The core classes are:
Pipeline: class for building and executing pipelines
PipelineTask: class for defining pipeline tasks
PipelineFunctionTask: subclass of PipelineTask which enables Python functions to be run as external processes
Additional supporting classes:
PipelineCommand: class for defining commands that can be used in tasks
PipelineCommandWrapper: shortcut alternative to PipelineCommand
PipelineScriptWrapper: subclass of PipelineCommand for scripts
PipelineParam: class for passing arbitrary values between tasks
FileCollector: returning collections of files based on glob patterns
FunctionParam: PipelineParameter-like deferred function evaluation
ListParam: PipelineParameter-like implementation of list-like behaviour
PathJoinParam: PipelineParameter-like dynamic file path joiner
PathExistsParam: PipelineParameter-like dynamic file existence checker
The following exception classes are defined:
PipelineError: general pipeline-related exception
There are some underlying classes and functions that are intended for internal use:
BaseParam: base class for parameter-like classes
Capturing: capture stdout and stderr from a Python function
Dispatcher: run a Python function as an external process
sanitize_name: clean up task and command names for use in pipeline
collect_files: collect files based on glob patterns
resolve_parameter: get the value of arbitrary parameter or object
Overview
For the purposes of the pipeliner
module, a ‘pipeline’ consists
of a set of ‘tasks’: a task can be independent of any other task, or
it may depend on one or more tasks being completed before it can
start.
Defining tasks: examples
Tasks must be defined by subclassing the PipelineTask
class and
implementing the following methods:
init: used to declare any input parameters for the task
setup: perform any set up (e.g. creating output directories) or other arbitary actions; typically it also defines any commands that will be sent to the scheduler, however this is optional.
finish: perform final actions after setup and any defined commands have completed (nb
finish
is optional)output: return any outputs from the task once it has completed.
For example: let’s define a task which runs the fastqc
program
on a collection of Fastq files one at a time:
class RunFastqc(PipelineTask):
def init(self,fastqs,out_dir):
self.add_output('out_files',list())
def setup(self):
if not os.path.exists(self.args.out_dir):
os.mkdir(self.args.out_dir)
for fq in self.args.fastqs:
self.add_cmd("Run FastQC",
Command("fastqc",
"-o",self.args.out_dir,
fq))
def finish(self):
for fq in self.args.fastqs:
if fq.endswith(".gz"):
fq = os.path.splitext(fq)[0]
out_file = os.path.join(
self.args.out_dir,
os.path.splitext(
os.path.basename(fq))[0]+"_fastqc.html")
if not os.path.exists(out_file):
self.fail(message="Missing output file: %s" % out_file)
else:
self.output.out_files.append(out_file)
The key features are:
The argument list of the
init
method defines an arbitrary set of parameters which are made available to the other methods via theself.args
object.The
init
method also adds an output calledout_files
which will be used to store the outputs of the task; it can be accessed via theoutput
method.The
setup
method creates the output directory if it doesn’t already exist, and then calls theadd_cmd
method to add a command for each Fastq file supplied via thefastqs
argument (accessed asself.args.fastqs
), which will runfastqc
on that Fastq.The command to run
fastqc
is created by creating aCommand
instance, which specifies the command and arguments toexecute.
The commands defined via the
add_cmd
method are not guaranteed to run sequentially. If there are additional commands that rely on the first set of commands finishing, then either append these to the commands using the shell&&
notation, or put them into a separate task which should be executed afterwards.The
finish
method constructs the names of the expected output Fastqc HTML files, and adds them to theout_files
list which was originally initialised within theinit
method.The task can explicitly indicate a failure by calling the
fail
method, as in thefinish
method above. Raising an exception will also implicitly indicate a failure.
Another example is a task which does a simple-minded read count on a set of Fastq files:
class CountReads(PipelineTask):
def init(self,fastqs):
self.add_output('counts',dict())
def setup(self):
for fq in self.args.fastqs:
if os.path.splitext(fq)[1] == ".gz":
cat = "zcat"
else:
cat = "cat"
self.add_cmd(
"Count reads",
Command("echo","-n",fq,"' '","&&",
cat,fq,"|","wc","-l"))
def finish(self):
for line in self.stdout.split('
- ‘):
- if not line:
continue
fq = line.split()[0] read_count = int(line.split()[1])/4 self.outputs.counts[fq] = read_count
The key features are:
The
init
method initialises thecounts
output which will be populated in thefinish
method, and accessed via theoutput
method.The standard output from the task is available via the
stdout
property of the instance.The
finish
method is implemented to extract the line count data from standard output and convert this to a read count which is stored against the Fastq name.
A final example is a task which filters out the Fastq files which have non-zero read counts:
class FilterEmptyFastqs(PipelineTask):
def init(self,read_counts):
self.add_output('filtered_fastqs',list())
def setup(self):
for fq in self.args.read_counts:
if self.args.read_counts[fq] > 0:
self.output.filtered_fastqs.append(fq)
In this case all the processing is performed by the setup
method;
no commands are defined.
Running scripts from within tasks
It is possible to define scripts to run from within tasks, for
example previous the CountReads
task might be more cleanly
implemented using scripts when adding commands:
class CountReads(PipelineTask):
...
def setup(self):
for fq in self.args.fastqs:
if os.path.splitext(fq)[1] == ".gz":
cat = "zcat"
else:
cat = "cat"
self.add_cmd(
"Count reads",
'''
# Count lines in a FASTQ file
echo -n {fastq} && {cat} {fastq} | wc -l
'''.format(fastq=fq,cat=cat))
...
Scripts can be multiline and use bash
syntax, so they can
provide an alternative to coding logic within the setup
function.
Defining task outputs
Outputs should be defined within the init
method, using the
add_output
method of the task, for example:
self.add_output('fastqs',dict())
The outputs can be accessed via the task’s output property,
which returns an AttributeDictionary
object where the
keys/attributes are those defined by the add_output
calls,
for example:
task = FilterEmptyFastqs(counts)
...
filtered_fastqs = task.output.fastqs
Typically the values of the outputs are not known before the
task has been run, so the setup
and finish
methods can
be used to populate the outputs when the task completes (as in
the FilterEmptyFastqs
example above).
Using task output as input to another task
The key feature of a pipeline is that the outputs from an upstream task can be used as input to one or more downstream tasks.
In this case the objects returned by output
are likely to
be passed to other tasks before the task has completed. There
are a number of implications:
Tasks that receive output from a preceeding task cannot assume that those outputs are ready or complete at the point of initialisation. It’s therefore recommended that tasks don’t attempt to use those outputs in their ‘init’ method - all processing should be deferred until the ‘setup’ method (when preceeding tasks will have completed).
Outputs from tasks should be passed by object references (e.g. via a list or dictionary, which can be updated by the task after being passed, via specialised classes such as
FileCollector
orPipelineParam
- see sections below).
As an example, consider the following task which reverses the order of a list of items:
class ReverseList(PipelineTask):
def init(self,items):
self.add_output('reversed_list',list())
def setup(self):
# Generate the reversed list
for item in self.args.items[::-1]:
self.output.reversed_list.append(item)
This might be used as follows:
reverse = ReverseList("Reverse order of list",[1,2,3])
Subsequently reverse.output.reversed_list
will return the
reference to the output list, which can then be passed to another task.
The list will be populated when the reverse task runs, at which point
the output will be available to the following tasks.
Task working directory isolation
By default each task is run in its own specially created directory, to isolate it from other tasks (note that jobs run by the same task will share this directory).
Isolation of task working directories can be disabled for all tasks
within a pipeline at runtime, by setting the isolate_tasks
parameter of the pipeline’s run
method to False
.
Alternatively it can be disabled for individual task instances by
explicitly setting the make_task_working_dir
parameter for that
task instance to False
when it is created (this will override
the setting specified within the pipeline at runtime).
Specialised task input/output classes
There are a number of specialised classes which can be used to pass data from the output of one task into the input of another:
FileCollector
The FileCollector
class enables the collection of files matching
a glob-type pattern. It behaves as an iterator, with the file collection
being deferred until the iteration actually takes place; it can therefore
be used as an output for tasks where a set of files are created but the
precise number or names of the files are not known a priori.
For example:
class MakeFiles(PipelineTask):
def init(self,d,filenames):
self.add_output('files',
FileCollector(self.args.d,"*"))
def setup(self):
# Create a directory and "touch" the files
os.mkdir(self.args.d)
for f in self.args.filenames:
with open(os.path.join(self.args.d,f) as fp:
fp.write()
PipelineParam
The PipelineParam
class offers a way to pass strings or numerical
types which would normally be immutable between tasks.
For example:
import string
from random import choice
class RandomString(PipelineTask):
def init(self,n):
self.add_output('string',PipelineParam())
def setup(self):
# Create a random string of length 'n' characters
allchar = string.ascii_letters + string.punctuation + string.digits
s = "".join([choice(allchar) for x in range(self.args.n)])
# Assign the string to the output
self.output.string.set(s)
If passed as input to a subsequent task, the PipelineParam
instance will behave as a static value when accessed via
self.args...
.
FunctionParam
The FunctionParam
class allows function invocations to be passed
to tasks as PipelineParam
-like objects, so that the function
evaluation is only performed when the task actually consumes the
object.
The function can be a lambda
function, or a full function
defined elsewhere. For example:
is_dir = FunctionParam(lambda d: os.path.isdir(d),out_dir)
or (more concisely):
out_dir_exists = FunctionParam(os.path.isdir,out_dir)
Arguments and keywords supplied to the FunctionParam
class are
passed to the function for evaluation only when the object’s
value
method is invoked (with any parameter-like arguments
being converted to their values immediately prior to evaluation).
This can be useful when building a pipeline where functions can’t
be evaluated until the pipeline is executed, and as an alternative
to creating a full PipelineFunctionTask
to wrap a function.
ListParam
The ListParam
class a parameter-like alternative to using a
standard Python list
to pass lists between tasks in a pipeline;
it is most useful when the list itself contains parameter-like
items, in which case these items are also evaluated before the
ListParam
is passed to a task.
PathJoinParam
The PathJoinParam
class provides a parameter-like alternative
to the os.path.join
function; it is useful when building
pipelines where it is not possible to construct the final paths
for files or directories until the pipeline is actually executed
(for example if some path components are supplied as parameters).
PathExistsParam
The PathExistsParam
class provides a parameter-like alternative
to the os.path.exists
function; it is useful when building
pipelines where it is not possible to check the existence of some
paths until the pipeline is actually executed (for example because
files or directories being checked are created during pipeline
execution).
Running Python functions as tasks
The PipelineFunctionTask class enables a Python function to be run
as an external process, for example reimplementing the earlier
CountReads
example:
from bcftbx.FASTQFile import nreads
class CountReads(PipelineFunctionTask):
def init(self,fastqs):
self.add_output('counts',dict())
def setup(self):
self.add_call("Count reads in Fastq",
self.count_reads,
self.args.fastqs)
def count_reads(self,fastqs):
# Count reads in each Fastq
counts = dict()
for fq in fastqs:
counts[fq] = nreads(fq)
return counts
def finish(self):
for fq in self.result():
for fq in result[fq]:
self.output.counts[fq] = result[fq]
The main differences between this and the PipelineTask class are:
The class includes a method (in this case
make_files
) which implements the Python function to be executed.The
add_call
method is used in thesetup
method to define calls to the function to run (analogous to theadd_cmd
method for normal tasks).add_call
can be used multiple times, e.g. to break a task into several separate processes (again analogous toadd_cmd
).The return values from the invoked function are collected and made available via the
result
method. This returns a list of results (one for eachadd_call
was used). Thefinish
method contains code to post-process these results.
The advantage of this approach is that intensive operations (e.g. processing large numbers of files) previously performed by Python functions can be farmed out to external processes without having to be wrapped in new executable programs.
Note
The FunctionParam
class could also be considered as an
alternative to PipelineFunctionTasks
for light-weight
function calls during pipeline execution.
Building and running a pipeline
An empty pipeline is created by making a Pipeline
instance:
ppl = Pipeline()
Specific task instances are then created from PipelineTask subclasses, for example using the tasks defined previously:
read_counts = CountReads("Count the reads",fastqs)
filter_empty_fastqs = FilterEmptyFastqs("Filter empty Fastqs",
read_counts.output.filtered_fastqs)
run_fastqc = RunFastqc("Run Fastqc",
filter_empty_fastqs.output.filtered_fastqs)
Note that when instantiating a task, it must be given a name; this can be any arbitrary text and is intended to help the end user distinguish between different task instances in the pipeline.
The tasks are then added to the pipeline using the add_task
method:
ppl.add_task(read_counts)
ppl.add_task(filter_empty_fastqs,
requires=(read_counts,))
ppl.add_task(run_fastqc,
requires=(filter_empty_fastqs,))
The pipeline is then run using the run
method:
ppl.run()
Notes:
Tasks will only be executed once any tasks they depend on have completed successfully; these are specified via the
requires
argument of theadd_task
method (in which case it must be a list or tuple of task instances), and/or via therequires
method of a task instance.Tasks that fail (i.e. complete with non-zero exit status) will cause the pipeline to halt at that point.
The
run
method blocks and returns the exit status of the pipeline execution.
Defining relationships between tasks
A task in a pipeline may depend on one or more other tasks in the pipeline to complete before it can be run; these are referred to as “requirements” of the task.
Requirements can be specified in different ways:
When a task is added to a pipeline via the
add_task
method then a list of required tasks can also be specified via therequires
argument;Requirements can be added directly to a task using its
requires
method.A task can be made the requirement of other tasks using its
required_by
method.
Note that these two approaches are not exclusive, and can be used together on the same task to specify requirements in a pipeline.
Additionally, requirements are automatically added implicitly for each input that is a parameter-based output from another task.
Requirement specification can be combined with two properties that can be useful for when adding tasks to an existing pipeline:
initial_tasks
returns a list of all the tasks in the pipeline that don’t have any requirements; it can be used when inserting tasks at the start of a pipeline, as the inserted task can be made into a requirement of the initial tasks using the idiomtask.required_by(*pipeline.initial_tasks)
, andfinal_tasks
returns a list of the all the tasks in the pipeline which no other tasks require (essentially they will run at the end of the pipeline); it can be used when appending tasks to the end of a pipeline using the idiomtask.requires(*pipeline.final_tasks)
.
Scheduling and running jobs
By default the run
method of the Pipeline instance creates
a new SimpleScheduler
instance internally and uses this to
run the commands generated by the tasks in the pipeline.
The run
method provides a number of arguments that can be
used to configure the scheduler, specifically:
max_jobs
: this sets the maximum number of concurrent jobs that the scheduler will run (defaults to 1)max_slots
: this sets the maximum number of concurrent CPUs (aka “slots”) that the scheduler will allocate work for (defaults to no limit).poll_interval
: the time interval that the scheduler will used when checking the status of running jobs (defaults to 5 seconds)
If more control is required over the scheduler then the calling
subprogram can create and configure its own SimpleScheduler
instance and pass this to the run
method instead. Note that
it is the responsibility of the subprogram to start and stop
the scheduler that it provides.
Specifying job runners for tasks
By default when a pipeline is executed then all jobs within
its tasks will be run using the default job runner supplied
via the default_runner
argument of the pipeline’s run
method.
For example:
ppl.run(default_runner=SpecialJobRunner())
(If no default is supplied then a SimpleJobRunner
is used
as the default runner.)
Typically it is desirable to be able to exercise more granular
control over the job runners used within the pipeline. This is
possible via the runner
keyword of the add_task
method
of the Pipeline
class, which allows a job runner to be
specified which will then used when jobs in that task are
executed.
For example:
ppl.add_task(my_task,runner=SimpleJobRunner())
The Pipeline
class also allows parameterisation of job
runners when building a pipeline. Placeholders for job
runners can be defined using the add_runner
method,
and accessed via the runners
property to be associated
with tasks.
For example:
# Define runner
ppl.add_runner('my_runner')
...
# Associate runner with task
ppl.add_task(my_task,runner=ppl.runners['my_runner'])
Job runners can then be set explicitly when the pipeline is
executed, using the runners
argument of the run
method to supply a mapping of runner names to job runner
instances.
For example:
ppl.run(runners={ 'my_runner': SpecialJobRunner() })
Any runner names that don’t have associated job runner
instances will use the default runner defined via the
default_runner
argument.
Dynamically setting number of CPUs/threads via job runners
When job runners are created they can have a maximum number of available CPUs (aka “slots”) associated with them.
For SimpleJobRunner``s this has to be set explicitly via
the ``nslots
argument, for example:
runner = SimpleJobRunner(nslots=8)
By default only a single slot is allocated. (For
GEJobRunners
the number of slots is set implicitly.)
The number of slots can then be accessed at runtime, so that
jobs run within a task use the appropriate number of CPUs
dynamically, by using the runner_nslots
method.
For standard PipelineTask
classes, this should be done
when constructing commands within the setup
method. For
example: bowtie2
takes a --threads
option which
tells the program how many threads it should use. A minimal
task to run bowtie2
with dynamically assigned number of
threads might look like:
class RunBowtie2(PipelineTask):
def init(self,fastq,index_basename,sam_out):
pass
def setup(self):
self.add_cmd("Run bowtie",
Command("bowtie2",
"-x",self.args.index_basename,
"-U",self.args.fastq,
"-S",self.args.sam_out,
"--threads",self.runner_nslots)
Note
When using dynamic CPU assignment with SimpleJobRunners
,
it may also be worth considering using the max_slots
parameter when running the pipeline.
Dealing with stdout and stderr from tasks
The stdout and stderr from tasks which run external commands
can be accessed via the stdout
and stderr
properties
respectively of the task instance once it has completed.
Where multiple jobs were run by the task, the stdout from all jobs are concatenated and returned via this property (if stderr was not redirected to stdout then this will also be concatenated across all jobs).
The stdout for each job is topped and tailed with a standard set of comment lines output from the wrapper scripts, of the form:
#### COMMAND Echo text
#### HOSTNAME popov
#### USER pjb
#### START Thu Aug 17 08:38:14 BST 2017
...
...Job-specific output...
...
#### END Thu Aug 17 08:38:14 BST 2017
#### EXIT_CODE 0
When parsing the stdout it is recommended to check for these lines
using e.g. line.startswith("#### ")
.
Note
Currently the stderr for each job is not enclosed by standard comment lines.
Handling failed tasks in pipelines
If a task in a pipeline fails (that is, completes with a non-zero exit code) then the pipeline is considered to have failed. In this case the pipeline can use one of a number of strategies to handle execution of the remaining tasks:
Pipeline execution halts immediately and all running tasks are terminated (‘immediate’ mode, the default)
Pipeline execution continues but all tasks which depend on the failed tasks are removed and not executed (‘deferred’ mode)
The strategy can be set explicitly at runtime by setting the
exit_on_failure
argument of the pipeline run
method
to one of the values defined in the PipelineFailure
class.
For example:
from pipeliner import PipelineFailure
...
# Define pipeline
...
# Run pipeline in 'deferred' mode
ppl.run(exit_on_failure=PipelineFailure.DEFERRED)
Note that regardless of how the failures are handled the pipeline will always return exit code 1 when one or more tasks fail.
Executing pipeline commands in batches
By default when the pipeline executes the commands generated by a task, each command is sent to the scheduler as a single job.
It is also possible to request that the pipeline executes commands
in batches, by specifying either a non-zero size for the
batch_size
option of the run
method, or by specifying
a non-zero batch_limit
.
If batch_size
is set then commands are grouped together
into batches of this size, and each batch is sent to the scheduler
as a single job; if batch_limit
is set then the batch size
is set automatically so that the number of batches don’t
exceed the specified limit.
Within a batch the commands are executed sequentially, and if one command fails then all subsequent commands in the batch won’t run.
Batch mode can also be requested on a per-task basis, by
explicitly specifying batch_size
as keyword when adding
the task to the pipeline. For example:
ppl = Pipeline()
...
ppl.add_task(my_task,batch_size=5)
This will override any batch size set globally when the pipeline is run.
Setting pipeline parameters at execution time
When building pipelines, it is sometimes necessary or desirable
to supply a parameter to a task where the value of that parameter
isn’t known until execution time (via the run
method).
For example, a task in the pipeline might need to know the number of cores or the location of a temporary directory to be used, which can only be set at execution time.
To handle these situations, it possible to define arbitrary
parameters within the Pipeline
class at build time which are
passed to tasks as placeholders, and then set the values of these
parameters at execution time.
The add_param
method is used to define a parameter, for
example:
ppl = Pipeline()
ppl.add_param('ncores',value=1,type=int)
ppl.add_param('tmpdir')
This creates a new PipelineParam
instance which is associated
with the supplied name.
The parameters can be accessed via the pipeline’s params
property, and passed as input into tasks, for example:
task = ExampleTask("This is an example",
ncores=ppl.params.ncores,
tmpdir=ppl.params.tmpdir)
ppl.add_task(task)
The runtime values of parameters are then passed via the
params
argument of the pipeline’s run
invocation:
temporary_dir = tempfile.mkdtemp()
ppl.run(params={ 'ncores': 8,
'tmpdir': temporary_dir, })
Built-in parameters
In addition to the custom parameters defined using the
add_param
method and outlined in the previous section, a
number of ‘built-in’ parameters are also available as properties
of the Pipeline
instance, for use when building a pipeline.
Specifically these are:
WORKING_DIR
: the working directory used by the pipelineBATCH_SIZE
: the batch size to be used when running jobs within pipeline tasksBATCH_LIMIT
: the maximum number of batches of jobsVERBOSE
: whether the pipeline is running in ‘verbose’ mode
These can be used in the same way as the custom parameters when setting up tasks, for example:
task = ExampleTask("This is an example",
ncores=ppl.params.ncores,
tmpdir=ppl.params.WORKING_DIR)
The values will be set when the pipeline’s run
method is
invoked.
Defining execution environment for a task: runners, modules & conda
It is possible to define the execution environment on a per-task basis within a pipeline, by defining job runners, environment modules and conda dependencies.
Runners and environments can be declared in a parameterised
fashion when a pipline is created, using the add_runner
and
add_envmodules
methods respectively of the Pipeline
class.
For example:
ppl = Pipeline()
ppl.add_runner('4_cpus')
ppl.add_envmodules('myenv')
This defines a runner
called 4_cpus
and an environment
called myenv
.
The runners and environments are accessed via the runners
and envmodules
properties of the Pipeline
instance, and
can be associated with tasks within the pipeline when they
are added via the add_task
method, using the runner
and
envmodules
keywords respectively).
For example:
ppl.add_task(my_task,runner=ppl.runners['4_cpus'],...)
and
ppl.add_task(my_task,envmodules=ppl.envmodules['myenv'],...)
Actual runners and environments can be assigned when the pipeline
is executed, via the runners
and envmodules
options of
the run
method of the Pipeline
instance - these are
mappings of the names defined previously to JobRunner
instances,
and to lists of environment modules.
For example:
ppl.run(runners={ '4_cpus': GEJobRunner('-pe smp.pe 4'), },
envmodules={ 'myenv': 'apps/trimmomatic/0.38', },...)
If a runner is not explicitly set for a task then the pipeline’s
default runner is used for that task; this defaults to a
SimpleJobRunner
instance but can be set explicitly via the
default_runner
argument of the Pipeline
instance’s run
method.
Execution environments can also be defined with conda
packages.
The packages and versions required by a task are declared in a
task’s init
method with calls to the conda
method, for
example:
class RunFastqc(PipelineTask):
def init(self,fastq,out_dir):
self.conda("fastqc=0.11.3")
...
If conda
dependency resolution is enabled when the pipeline is
executed then these declarations will be used to generate conda
environments that are activated when the tasks run (otherwise they
are ignored) (see the section “Enabling conda to create task environments
automatically” for details).
Note
By default the PYTHONNOUSERSITE
environment variable is
set in the execution environment, and any directories matching
the path ${HOME}/.local/lib/python*
are removed from the
PYTHONPATH
.
Together this means that Python programs that are run within
that environment will ignore any packages installed in
the user site-packages
directory (which can otherwise
cause issues with conda
dependency resolution).
Defining outputs from a pipeline
It is possible to define outputs for a Pipeline
instance in
the same way that outputs can be defined for individual tasks.
The add_output
method of the Pipeline
class allows an
arbitrary output to be defined, for example:
ppl = Pipeline()
...
ppl.add_output('final_result',result)
ppl.run()
This can be accessed via the pipeline’s output
property:
- ::
print(“The result is ‘%s’” % ppl.output.result)
It is possible that pipeline outputs are defined as
PipelineParam
instances (for example, if a pipeline output is
taken from an output from one of its constituent tasks). By
default, on pipeline completion the outputs are “finalized” by
substituting the PipelineParam``s for their actual values. To
prevent this behaviour, set the ``finalize_outputs
argument of
the pipeline’s run
method to False
. For example:
ppl = Pipeline()
ppl.add_output('final_result',PipelineParam())
...
ppl.run(finalize_outputs=False)
It is recommended that outputs are defined as PipelineParam
instances, to take advantage of the implicit task requirement
gathering mechanism.
Enabling conda to create task environments automatically
The conda
package manager can be used within Pipeline``s
to automatically create run-time environments for any tasks which
declare ``conda
dependencies in their init
methods.
To enable the use of conda when a pipeline is executed, specify
the following arguments when invoking the pipeline’s run
method:
enable_conda
must be set toTrue
, andThe path to an existing
conda
installation must be supplied viaconda
argument.
For example:
ppl = Pipeline()
...
ppl.run(enable_conda=True,conda='/usr/local/miniconda3/bin/conda)
(By default new conda
environments are created in a subdirectory of
the working directory, but it is possible to explicitly specify a
different location via the conda_env_dir
of the run
method.)
A note on PipelineCommand and PipelineCommandWrapper
In the first version of the pipeliner code, commands within the
setup
method of PipelineTasks
had to be generated using
subclasses of the PipelineCommand
class.
A simple example to run Fastqc:
class Fastqc(PipelineCommand):
def init(self,fastq,out_dir):
self._fastq = fastq
self._out_dir = out_dir
def cmd(self):
return Command("fastqc",
"-o",self._out_dir,
self._fastq)
which can then be used within a task, for example:
class RunFastqc(PipelineTask):
...
def setup(self):
...
for fq in self.args.fastqs:
self.add_cmd(Fastqc(fq,self.args.out_dir))
Subsequently a “shortcut” class called PipelineCommandWrapper
was introduced, which allowed the generation of commands to be
performed within the task class without a PipelineCommand
subclass.
In this case the example RunFastqc
task becomes:
class RunFastqc(PipelineTask):
...
def setup(self):
...
for fq in self.args.fastqs:
self.add_cmd(
PipeLineCommandWrapper(
"Run FastQC",
"fastqc",
"-o",self._out_dir,
self._fastq))
This has since been superseded by updates to the add_cmd
method
which allows a title and Command
instance (or a string
containing a script) to be supplied instead. In this case the
example would become:
class RunFastqc(PipelineTask):
...
def setup(self):
...
for fq in self.args.fastqs:
self.add_cmd("Run FastQC",
Command("fastqc",
"-o",self._out_dir,
self._fastq))
or (if using a script):
class RunFastqc(PipelineTask):
...
def setup(self):
...
for fq in self.args.fastqs:
self.add_cmd("Run FastQC",
'''
fastqc -o {out_dir} {fastq}
'''.format(out_dir=self._out_dir,
fastq=self._fastq))
All approaches are supported, however the last two are likely to
result in cleaner code that is easier to read. PipelineCommand
is probably better suited to situations where the same command will
be used across multiple distinct tasks. In cases where the command
is only used in one task, the last two approaches are recommended.
Advanced pipeline construction: combining pipelines
It possible to build larger pipelines out of smaller ones by using
the add_pipeline
method of the Pipeline
class, which pulls
tasks from one pipeline into another along with their dependency
relationships.
Optionally dependencies on tasks from the “master” pipeline can be added to the imported pipeline tasks.
Parameters defined in the imported pipeline are also imported and exposed with the same names; but it is also possible to override them with with parameters defined in the master pipeline, or parameters that are task outputs.
There are two specialised methods (append_pipeline
and
merge_pipeline
) which wrap the add_pipeline
method:
append_pipeline
takes all the tasks from one pipeline and adds them to the end of another, so that the appended tasks only run after the original tasks have completed;merge_pipeline
takes all the tasks from one pipeline and adds them into another, without requiring that they wait until the original tasks have finished.
Appending can be used for building a pipeline out of distinct ‘sections’ of sub-pipelines; merging can be useful for running multiple pipelines in parallel.
- class auto_process_ngs.pipeliner.BaseParam
Provide base class for PipelineParam-type classes
Implements core functionality that should be shared across all parameter-like classes, including assigning a UUID and enabling a task ID to be associated with the parameter.
Provides the following attributes:
uuid
associated_task_id
and the following methods:
associate_task
- associate_task(task)
Associate a task with the parameter
- Parameters:
task (PipelineTask) – a task object to associate with the parameter
- property associated_task_id
Return the task ID of the associated task (or None)
- property uuid
Return the unique identifier (UUID) of the parameter
- class auto_process_ngs.pipeliner.Capturing
Capture stdout and stderr from a function call
Based on code from http://stackoverflow.com/a/16571630/579925 modified to handle both stdout and stderr (original version only handled stdout)
Usage:
>>> with Capturing() as output: ... print("Hello!") >>> for line in output.stdout: ... print("Line: %s" % line) >>> for line in output.stderr: ... print("Err: %s" % line)
- class auto_process_ngs.pipeliner.Dispatcher(working_dir=None, cleanup=True)
Class to invoke Python functions in external processes
Enables a Python function to be run in a separate process and collect the results.
Example usage:
>>> d = Dispatcher() >>> cmd = d.dispatch_function_cmd(bcftbx.utils.list_dirs,os.get_cwd()) >>> cmd.run_subprocess() >>> result = d.get_result()
The Dispatcher works by pickling the function, arguments and keywords to files, and then creating a command which can be run as an external process; this unpickles the function etc, executes it, and makes the result available to the dispatcher on successful completion.
If the invoked function raises an exception then the result will be returned as None, and the exception will be available from the get_exception method.
For example:
>>> if d.get_exception(): >>> print("Success: %s" % d.get_result()) >>> else: >>> print("Failure: %s" % d.get_exception())
Currently uses cloudpickle as the pickling module: https://pypi.org/project/cloudpickle/
- Parameters:
working_dir (str) – optional, explicitly specify the directory where pickled files and other intermediates will be stored
cleanup (bool) – if True then remove the working directory on exit
- dispatch_function_cmd(func, *args, **kwds)
Generate a command to execute the function externally
- Parameters:
func (function) – function to be executed
args (list) – arguments to invoke the function with
kwds (mapping) – keyword arguments to invoke the function with
- Returns:
- a Command instance that can be used to
execute the function.
- Return type:
- execute(pkl_func_file, pkl_args_file, pkl_kwds_file, pkl_result_file=None)
Internal: execute the function
- Parameters:
pkl_func_file (str) – path to the file with the pickled version of the function to be invoked
pkl_args_file (str) – path to the file with the pickled version of the function arguments
pkl_args_file – path to the file with the pickled version of the keyword arguments
pkl_result_file (str) – optional path to the file where the pickled version of the function result will be written
- get_exception()
Return the exception from the function invocation
- Returns:
- the exception returned by the function, or
None if exception was raised.
- Return type:
Exception
- get_result()
Return the result from the function invocation
- Returns:
- the object returned by the function, or None
if no result was found.
- Return type:
Object
- property working_dir
Return the working directory path
- class auto_process_ngs.pipeliner.FileCollector(dirn, pattern)
Class to return set of files based on glob pattern
- Parameters:
dirn (str) – directory to search
pattern (str) – glob pattern to match files against
- Yields:
String – path of matching file.
- class auto_process_ngs.pipeliner.FunctionParam(f, *args, **kws)
Class for deferred function evaluation as pipeline parameter
This class wraps a function with a set of parameters; the function evaluation is deferred until the ‘value’ property is invoked.
Any parameters which are PipelineParam-like instances will be replaced with their values before being passed to the function.
- property value
Return value from evaluated function
- class auto_process_ngs.pipeliner.ListParam(iterable=None)
Implement list-like behaviour as pipeline parameter
This class implements the pipeline parameter equivalent to the Python ‘list’ class. It supports append and extend methods, and the len function will return the number of elements in the list.
The value property returns a Python list, with any pipeline parameter-like objects in the original list replaced with their values.
It is recommended that ListParam instances should be used in pipelines when passing lists of parameters between tasks.
- class auto_process_ngs.pipeliner.PathExistsParam(p)
Class for checking file/directory existance as pipeline parameter
This class implements the pipeline parameter equivalent of the os.path.exists function, taking a path on instantiation (which can be a string or PipelineParam-like object) and returning a boolean value indicating whether the path is an existing file system object via the value property.
Example usage:
>>> exists = PathExistsParam("/path/to/file.txt") >>> exists.value True
>>> f = PipelineParam(value="/path/to/file.txt") >>> exists = PathExistsParam(f) >>> exists.value True >>> f.set("/path/to/missing.txt") >>> exists.value False
Note that this class doesn’t implement a set method (unlike the standard PipelineParam class) so the path elements cannot be changed after initialisation.
- class auto_process_ngs.pipeliner.PathJoinParam(*p)
Class for joining file paths as pipeline parameter
This class implements the pipeline parameter equivalent of the os.path.join function, taking a set of path elements on instantiation (which can be strings or PipelineParam-like objects) and returning the joined path elements on evaluation via the value property.
Example usage:
>>> pth = PathJoinParam("/path","to","file.txt") >>> pth.value "/path/to/file.txt"
>>> base_dir = PipelineParam(value="/path/to/base") >>> pth = PathJoinParam(base_dir,"file.txt") >>> pth.value "/path/to/base/file.txt" >>> base_dir.set("/path/to/new/base") >>> pth.value "/path/to/new/base/file.txt"
Note that this class doesn’t implement a set method (unlike the standard PipelineParam class) so the path elements cannot be changed after initialisation.
- class auto_process_ngs.pipeliner.Pipeline(name='PIPELINE')
Class to define and run a ‘pipeline’ of ‘tasks’
A pipeline consists of a set of tasks (defined by instantiating subclasses of PipelineTask) with simple dependency relationships (i.e. a task will depend on none, one or more other tasks in the pipeline to complete before it can be executed).
Example usage:
>> p = Pipeline() >> t1 = p.add_task(Task1()) >> t2 = p.add_task(Task2(),requires=(t1,)) >> … >> p.run()
Tasks will only run when all requirements have completed (or will run immediately if they don’t have any requirements).
- Parameters:
name (str) – optional name for the pipeline
- add_envmodules(name)
Define a new environment defined by modules
Creates a new
PipelineParam
instance associated with the supplied environment name.Runner instances can be accessed and set via the
envmodules
property of the pipeline, for example:To access:
>>> env_modules = ppl.envmodules['my_env'].value
To set:
>>> ppl.envmodules['my_env'].set("")
- Parameters:
name (str) – name for the new runner
- add_output(name, value)
Add an output to the pipeline
- Parameters:
name (str) – name for the output
value (object) – associated object
- add_param(name, value=None, type=None)
Define a new pipeline parameter
Creates a new
PipelineParam
instance associated with the supplied name.Parameters can be accessed and set via the
params
property of the pipeline.- Parameters:
name (str) – name for the new parameter
value (object) – optional, initial value to assign to the instance
type (function) – optional, function used to convert the stored value when fetched via the value property
- add_pipeline(pipeline, params=None, requires=None)
Import tasks from another pipeline
Adds the tasks from the supplied pipeline instance into this pipeline.
Dependency relationships defined between tasks in the imported pipeline are preserved on import. The ‘requires’ keyword can be used to define new dependencies between the initial tasks in the imported pipeline and those in the destination pipeline.
By default parameters defined in the added pipeline will be imported and exposed with the same names; the ‘params’ keyword can be used to replace them with arbitrary parameters (e.g. parameters defined in the master pipeline, outputs from tasks etc).
- Parameters:
pipeline (Pipeline) – pipeline instance with tasks to be added to this pipeline
params (mapping) – a dictionary or mapping where keys are parameter names in the imported pipeline and values are PipelineParam instances that they will be replaced by
requires (list) – optional list of tasks that the added pipeline will depend on
- add_runner(name)
Define a new runner within the pipeline
Creates a new
PipelineParam
instance associated with the supplied runner name.Runner instances can be accessed and set via the
runners
property of the pipeline, for example:To access:
>>> runner = ppl.runners['my_runner'].value
To set:
>>> ppl.runners['my_runner'].set(SimpleJobRunner())
- Parameters:
name (str) – name for the new runner
- add_task(task, requires=(), **kws)
Add a task to the pipeline
- Parameters:
task (PipelineTask) – task instance to add to the pipeline
requires (List) – list or tuple of task instances which need to complete before this task will start
kws (Dictionary) – a dictionary of keyword-value pairs which will be passed to the task at run time (see the
run
method of PipelineTask for valid options)
- append_pipeline(pipeline)
Append tasks from another pipeline
Adds the tasks from the supplied pipeline instance into this pipeline, ensuring that the appended tasks depend on the original tasks completing.
Dependencies which were already defined in the pipeline being appended are preserved when they are added to the first.
- Parameters:
pipeline (Pipeline) – pipeline instance with tasks to be appended
- property envmodules
Access the modules environments defined for the pipeline
Returns the dictionary mapping modules environment names to PipelineParam instances that store the module lists; so to get the list of modules associated with an environment name do e.g.
>>> modules = ppl.envmodules['my_env'].value
- property final_tasks
Fetch a list of ‘final tasks’ for the pipeline
- Returns:
- list of task instances from the pipeline
which don’t have any dependent tasks (i.e. the tasks that will run last)
- Return type:
- get_dependent_tasks(task_id)
Return task ids that depend on supplied task id
- Returns:
list of IDs for dependent tasks.
- Return type:
- get_task(task_id)
Return information on a task
- Parameters:
task_id (str) – unique identifier for a task
- Returns:
- tuple of (task,requirements,kws)
where ‘task’ is a PipelineTask instance, requirements is a list of PipelineTask instances that the task depends on, and kws is a keyword mapping
- Return type:
Tuple
- property initial_tasks
Fetch a list of ‘initial tasks’ for the pipeline
- Returns:
- list of task instances from the pipeline
which don’t depend on any other tasks (i.e. the tasks that will run first)
- Return type:
- merge_pipeline(pipeline)
Add tasks from another pipeline
Adds the tasks from the supplied pipeline instance into this pipeline, without requiring that the appended tasks depend on the original tasks.
Dependencies which were already defined in the pipeline being appended are preserved when they are added to the first.
- Parameters:
pipeline (Pipeline) – pipeline instance with tasks to be added
- property name
Return the name of the pipeline
- property output
Return the output object
- property params
Access the parameters defined for the pipeline
- rank_tasks()
Rank the tasks into order
- Returns:
- list of ‘ranks’, with each rank
being a list of task ids.
- Return type:
- report(s)
Internal: report messages from the pipeline
- Parameters:
s (str) – message to report
- run(working_dir=None, tasks_work_dir=None, log_dir=None, scripts_dir=None, log_file=None, sched=None, default_runner=None, max_jobs=1, max_slots=None, poll_interval=5, params=None, runners=None, envmodules=None, batch_size=None, batch_limit=None, verbose=False, enable_conda=False, conda=None, conda_env_dir=None, use_locking=True, isolate_tasks=True, exit_on_failure=0, finalize_outputs=True)
Run the tasks in the pipeline
- Parameters:
working_dir (str) – optional path to top-level working directory (defaults to the current directory)
log_dir (str) – path of directory where log files will be written to
scripts_dir (str) – path of directory where script files will be written to
tasks_work_dir (str) – path to directory where tasks will be run
log_file (str) – path to file to write pipeline log messages to (in addition to stdout)
sched (SimpleScheduler) – a scheduler to use for running commands generated by each task
default_runner (JobRunner) – optional default job runner to use
max_jobs (int) – optional maximum number of concurrent jobs in scheduler (defaults to 1; ignored if a scheduler is provided via ‘sched’ argument)
max_slots (int) – optional maximum number of ‘slots’ (i.e. concurrent threads or maximum number of CPUs) available to the scheduler (defaults to no limit)
poll_interval (float) – optional polling interval (seconds) to set in scheduler (if scheduler not provided via the ‘sched’ argument), and to use for checking if tasks have completed (defaults to 5s)
params (mapping) – a dictionary or mapping which associates parameter names with values
runners (mapping) – a dictionary or mapping which associates runner names with job runners
envmodules (mapping) – a dictionary or mapping which associates envmodule names with list of environment module names
enable_conda (bool) – if set then use conda to resolve dependencies for tasks which declare them (conda is disabled by default)
conda (str) – path to conda executable
conda_env_dir (str) – path to directory to create conda environments in
batch_size (int) – if set then run commands in each task in batches, with each batch running this many commands at a time (default is to run one command per job)
batch_limit (int) – if set then run commands in batches, with the batch size automatically set so that the number of batches doesn’t exceed this limit; ignored if batch_size is explicitly set (default is not to limit the number of batches)
verbose (bool) – if True then report additional information for diagnostics
use_locking (book) – if True then lock invocations of task methods
isolate_tasks (bool) – if True then automatically create dedicated working directories for each task
exit_on_failure (int) – either IMMEDIATE (any task failures cause immediate termination of of the pipeline; this is the default) or DEFERRED (the pipeline execution continues and only raises an error when all tasks have finished running)
finalize_outputs (bool) – if True then convert any pipeline outputs from PipelineParams to an actual value (this is the default)
- Returns:
- 0 for successful completion, 1 if there
was an error.
- Return type:
Integer
- property runners
Access the runners defined for the pipeline
Returns the dictionary mapping runner names to PipelineParam instances that store the runner instances; so to get the runner associated with a name do e.g.
>>> runner = ppl.runners['my_runner'].value
- start_scheduler(runner=None, max_concurrent=1, max_slots=None, poll_interval=5)
Internal: instantiate and start local scheduler
- stop_scheduler()
Internal: stop the pipeline scheduler
- terminate()
Internal: terminate a running pipeline
- class auto_process_ngs.pipeliner.PipelineCommand(*args, **kws)
Base class for constructing program command lines
This class should be subclassed to implement the ‘init’ and ‘cmd’ methods.
The ‘init’ method should do any preprocessing and caching of arguments to be used in the ‘cmd’ method; the ‘cmd’ method should use these to construct and return a ‘Command’ instance.
- cmd()
Build the command
Must be implemented by the subclass and return a Command instance
- init()
Initialise and store parameters
Must be implemented by the subclass
- make_wrapper_script(scripts_dir=None, shell='/bin/bash', envmodules=None, conda=None, conda_env=None, working_dir=None, batch_number=None, script_uuid=None)
Generate a uniquely-named wrapper script to run the command
- Parameters:
scripts_dir (str) – path of directory to write the wrapper scripts to
shell (str) – shell to use (defaults to ‘/bin/bash’)
envmodules (str) – list of environment modules to load
conda (str) – path to conda executable
conda_env (str) – name or path for conda environment to activate in the script
working_dir (str) – explicitly specify the directory the script should be executed in
batch_number (int) – for batched commands, the number of the batch that this script corresponds to (optional)
uuid (str) – optional ID string; if not supplied then a UUID string will be generated
- Returns:
name of the wrapper script.
- Return type:
String
- name()
Return a “sanitized” version of the class name
- quote_spaces()
Indicate whether spaces should be quoted in wrapper script
- class auto_process_ngs.pipeliner.PipelineCommandWrapper(name, *args)
Class for constructing program command lines
This class is based on the PipelineCommand class but can be used directly (rather than needing to be subclassed).
For example, to wrap the ‘ls’ command directly:
>>> ls_command = PipelineCommandWrapper("List directory",'ls',dirn)
It is also possible to extend the command line using the ‘add_args’ method, for example:
>>> ls_command = PipelineCommandWrapper("List directory",'ls') >>> ls.command.add_args(dirn)
- add_args(*args)
Add additional arguments to extend the command being built
- Parameters:
args (List) – one or more arguments to append to the command
- cmd()
Internal: implement the ‘cmd’ method
- init(*args)
Internal: dummy init which does nothing
- exception auto_process_ngs.pipeliner.PipelineError
Base class for pipeline-specific exceptions
- class auto_process_ngs.pipeliner.PipelineFunctionTask(_name, *args, **kws)
Class enabling a Python function to be run as a task
A ‘function task’ enables one or more instances of a Python function or class method to be run concurrently as external programs, and for the return values of the to recovered on task completion.
This class should be subclassed to implement the ‘init’, ‘setup’, ‘finish’ (optionally) and ‘output’ methods.
The ‘add_call’ method can be used within ‘setup’ to add one or more function calls to be invoked.
- add_call(name, f, *args, **kwds)
Add a function call to the task
- Parameters:
name (str) – a user friendly name for the call
f (function) – the function or instance method to be invoked in the task
args (list) – arguments to be passed to the function when invoked
kwds (mapping) – keyword=value mapping to be passed to the function when invoked
- finish_task()
Internal: perform actions to finish the task
- result()
Return the results of the function invocations
- class auto_process_ngs.pipeliner.PipelineParam(value=None, type=None, default=None, name=None)
Class for passing arbitrary values between tasks
The PipelineParam class offers a way to dynamically assign values for types which would otherwise be immutable (e.g. strings).
A new PipelineParam is created using e.g.:
>>> p = PipelineParam()
In this form there will be no initial value, however one can be assigned using the
value
argument, e.g.:>>> p = PipelineParam(value="first")
The associated value can be returned using the
value
property:>>> p.value "first"
and updated using the
set
method, e.g.:>>> p.set("last") >>> p.value "last"
The return type can be explicitly specified on object instantiation using the
type
argument, for example to force that a string is always returned:>>> p = PipelineParam(type=str) >>> p.set(123) >>> p.value "123"
If the
default
function is supplied then this will be used to generate a value if the stored value isNone
:>>> p = PipelineParam(default=lambda: "default") >>> p.value "default" >>> p.set("assigned") >>> p.value "assigned"
If a
name
is supplied then this will be stored and can be recovered via thename
property:>>> p = PipelineParam(name="user_name") >>> p.name "user_name"
The value of a parameter can be set to another parameter, in which case the value of the first parameter will be taken from the second:
>>> first_param = PipelineParam(value="first") >>> first_param.value "first" >>> second_param = PipelineParam(value="second") >>> first_param.set(second_param) >>> first_param.value "second"
A parameter can also be “replaced” with another parameter using its
replace_with
method:>>> p = PipelineParam(value="old") >>> p.value "old" >>> pp = PipelineParam(value="new") >>> p.replace_with(pp) >>> p.value "new"
This feature allows parameters defined in one context to be matched with parameters in another (for example when tasks from one pipeline are imported into another).
- Parameters:
value (object) – optional, initial value to assign to the instance
type (function) – optional, function used to convert the stored value when fetched via the value property
default (function) – optional, function which will return a default value if no explicit value is set (i.e. value is None)
name (str) – optional, name to associate with the instance
- property name
Return the name of the parameter (if supplied)
- replace_with(p)
Set a parameter to replace this one with
If a replacement parameter is set then the value will be taken from that parameter (ignoring all settings from this one)
- Parameters:
p (PipelineParam) – parameter to be used as a replacement on evaluation
- set(newvalue)
Update the value assigned to the instance
- Parameters:
newvalue (object) – new value to assign
- property value
Return the assigned value
If a default function was specified on instance creation then this will be used to generate the value to return if the stored value is None.
If a type function was also specified on instance creation then this will be used to convert the assigned value before it is returned.
- class auto_process_ngs.pipeliner.PipelineScriptWrapper(name, *scripts)
Class for constructing script command lines
This class is based on the PipelineCommand class but can be used directly (rather than needing to be subclassed).
For example, to wrap a bash script directly:
>>> ls_script = PipelineScriptWrapper("List directory", ... "ls {d}".format(d=dirn))
It is also possible to compose a script from multiple blocks, for example:
>>> properties = PipelineScriptWrapper("File properties", ... "export FILEN={f}".format( ... f="Example.fq"), ... "du -h $FILEN", ... "file $FILEN")
in which case the generated script will look like:
{ export FILEN=Example.fq } && { du -h $FILEN } && { file $FILEN }
- add_block(script)
Append a script block
- Parameters:
scripts (str) – script block to append
- cmd()
Internal: implement the ‘cmd’ method
- init(*args)
Internal: dummy init which does nothing
- class auto_process_ngs.pipeliner.PipelineTask(_name, *args, **kws)
Base class defining a ‘task’ to run as part of a pipeline
A ‘task’ wraps one or more external programs which can be run concurrently, and which produces a set of outputs. Individual programs should be wrapped in instances of the ‘PipelineCommand’ class.
This class should be subclassed to implement the ‘init’, ‘setup’, ‘finish’ (optionally) and ‘output’ methods.
The ‘add_cmd’ method can be used within ‘setup’ to add one or more ‘PipelineCommand’ instances.
- Parameters:
_name (str) – an arbitrary user-friendly name for the task instance
args (List) – list of arguments to be supplied to the subclass (must match those defined in the ‘init’ method)
kws (Dictionary) – dictionary of keyword-value pairs to be supplied to the subclass (must match those defined in the ‘init’ method)
- add_cmd(*args)
Add a PipelineCommand to the task
The arguments can be one of:
Single argument: PipelineCommand instance
Two arguments: title string and Command instance
Two arguments: title string and script (as string)
- Parameters:
Variable (see above) –
- add_output(name, value)
Add an output to the task
- Parameters:
name (str) – name for the output
value (object) – associated object
- property args
Fetch parameters supplied to the instance
- Returns:
- list of arguments (with any pipeline
parameter instances resolved to their current values).
- Return type:
- property completed
Check if the task has completed
- Returns:
- True if task has completed, False
if not.
- Return type:
Boolean
- conda(*reqs)
Specify one or more conda packages for the task
Package requirements can be either unversioned (e.g. ‘multiqc’) or versioned (e.g. ‘multiqc=1.8’).
- Parameters:
reqs (list) – one or more Conda package specifiers.
- property conda_dependencies
Return a list of conda packages required by the task
- conda_dependency_resolution_completed(name, jobs, sched)
Internal: callback method
Invoked when a conda dependency resolution job completes
- Parameters:
name (str) – name for the callback
jobs (list) – list of SchedulerJob instances
sched (SimpleScheduler) – scheduler instance
- property conda_env_name
Return a name for conda environment based on packages
- property exit_code
Get the exit code for completed task
- Returns:
exit code, or ‘None’ if task hasn’t completed
- Return type:
Integer
- fail(exit_code=1, message=None)
Register the task as failing
Intended to be invoked from the subclassed ‘setup’ or ‘finish’ methods, to terminate the task and indicate that it has failed.
NB when using the ‘fail’ method it is recommended that the method that it was invoked from should return immediately afterwards, to avoid any unexpected side effects
- Parameters:
exit_code (int) – optional, specifies the exit code to return (defaults to 1)
message (str) – optional, error message to report to the pipeline user
- finish()
Perform actions on task completion
Performs any actions that are required on completion of the task, such as moving or copying data, and setting the values of any output parameters.
Must be implemented by the subclass
- finish_task()
Internal: perform actions to finish the task
- id()
Get the name of the task within the pipeline
- Returns:
- a name consisting of a ‘sanitized’ version
of the supplied name appended with a unique id code
- Return type:
String
- init(*args, **kws)
Initialise the task
Defines the arguments and keywords required by the task and sets up output parameters and any internal variables required by
setup
andfinish
.Must be implemented by the subclass
- invoke(f, args=None, kws=None)
Internal: invoke arbitrary method on the task
If the working directory is defined then calling ‘invoke’ changes to this directory before the specified function is called.
- Parameters:
f (function) – method to invoke (e.g. ‘self.init’)
args (list) – arguments to invoke function with
kws (dictionary) – keyworded parameters to invoke function with
- name()
Get the name of the task
- Returns:
the name supplied when the task was created
- Return type:
String
- njobs()
Get number of jobs associated with the task
Returns the total number of jobs and the number of completed jobs associated with a running task, as a tuple: (njobs,ncompleted)
- property output
Return the output object
- report(s)
Internal: report messages from the task
- Parameters:
s (str) – message to report.
- report_diagnostics(s, reportf=None, verbose=True)
Internal: report additional diagnostic information
Reports current and working directories, current directory contents, scripts and script outputs; to be invoked on task failure.
- Parameters:
s (str) – string describing the reason for the diagnostics being reported
reportf (func) – function to use for reporting (defaults to the object’s ‘report’ method)
- report_failure(reportf=None, verbose=False)
Internal: report information on task failure
- Parameters:
reportf (func) – function to use for reporting (defaults to the object’s ‘report’ method)
- required_by(*tasks)
Add this task as a requirement of others
Each specified task will wait for this task to complete before they can run.
- Parameters:
tasks (List) – list of PipelineTask objects that will have this task added as a requirement
- property required_task_ids
Return the task IDs for tasks required by this task
- requires(*tasks)
Add tasks as requirements
Each specified task will be added to the list of tasks that need to complete before this task can run.
- Parameters:
tasks (List) – list of PipelineTask objects to be added to this task as requirements
- requires_id(task_id)
Add task ID to list of requirements
- Parameters:
task_id (str) – UUID of the task to be added as a requirement
- resolve_dependencies(enable_conda=False, conda=None, conda_env_dir=None, sched=None, scripts_dir=None, working_dir=None, log_dir=None, timeout=600, verbose=False)
Peform dependency resolution for the task
- Parameters:
enable_conda (bool) – if True then enable conda dependency resolution
conda (str) – path to conda executable
conda_env_dir (str) – path to directory holding conda environments
sched (SimpleScheduler) –
scripts_dir (str) – path to write scripts to
log_dir (str) – path to directory to write log files to
timeout (int) – number of seconds to wait to acquire lock on environments directory before giving up (default: 600)
verbose (bool) – if True then report additional information for diagnostics
- property resolving_dependencies
Check if task is currently resolving dependencies
Returns True if dependency resolution is in progress, False otherwise
- run(sched, runner=None, envmodules=None, enable_conda=False, conda=None, conda_env_dir=None, working_dir=None, log_dir=None, scripts_dir=None, log_file=None, wait_for=(), asynchronous=True, poll_interval=5, batch_size=None, batch_limit=None, lock_manager=None, verbose=False, make_task_working_dir=False)
Run the task
This method is not normally invoked directly; instead it’s called by the pipeline that the task has been added to.
- Parameters:
sched (SimpleScheduler) – scheduler to submit jobs to
runner (JobRunner) – job runner to use when running jobs via the scheduler
envmodules (list) – list of environment modules to load when running jobs in the task
enable_conda (bool) – whether to enable conda dependency resolution (default: False)
conda (str) – path to conda
conda_env_dir (str) – path to top-level directory to create or find conda environments
working_dir (str) – path to the working directory to use (defaults to the current working directory)
log_dir (str) – path to the directory to write logs to (defaults to the working directory)
scripts_dir (str) – path to the directory to write scripts to (defaults to the working directory)
log_file (str) – path to file to write task log messages to (in addition to stdout)
wait_for (list) – deprecated: list of scheduler jobs to wait for before running jobs from this task
asynchronous (bool) – if False then block until the task has completed
poll_interval (float) – interval between checks on task completion (in seconds) for non-asynchronous tasks (defaults to 5 seconds)
batch_size (int) – if set then run commands in each task in batches, with each batch running this many commands at a time (default is to run one command per job)
batch_limit (int) – if set then run commands in batches, with the batch size automatically set so that the number of batches doesn’t exceed this limit; ignored if batch_size is explicitly set (default is not to limit the number of batches)
lock_manager (ResourceLock) – inter-task lock manager
verbose (bool) – if True then report additional information for diagnostics
make_task_working_dir (bool) – if True then create a new task-specific working directory under the top-level working directory
- property runner_nslots
Get number of slots (i.e. available CPUs) set by runner
This returns the number of CPUs available to the command as set in the job runner which will be used to run the job.
- Returns:
environment variable to get slots from.
- Return type:
String
- setup()
Set up commands to be performed by the task
Must be implemented by the subclass
- property stderr
Get the standard error from the task
- Returns:
standard error from the task.
- Return type:
String
- property stdout
Get the standard output from the task
- Returns:
standard output from the task.
- Return type:
String
- task_completed(name, jobs, sched)
Internal: callback method
This is a callback method which is invoked when scheduled jobs in the task finish
- Parameters:
name (str) – name for the callback
jobs (list) – list of SchedulerJob instances
sched (SimpleScheduler) – scheduler instance
- terminate()
Internal: terminate the task
- property updated
Check if the task has been updated since the last check
- Returns:
- True if task has been updated, False
if not.
- Return type:
Boolean
- auto_process_ngs.pipeliner.check_conda_env(conda, env_name, env_dir=None, timeout=600)
Check a conda environment
Verifies whether the named environment exists and can be activated.
- Parameters:
conda (str) – path to conda executable env_name (str): name for the environment to create package_list (list): list of packages to install
env_dir (str) – path to conda environments directory (defaults to environments directory belonging to the supplied conda installation)
timeout (int) – number of seconds to wait to acquire lock on environments directory before giving up (default: 600)
- Returns:
- path to Conda environment (or None if the
environment failed the checks).
- Return type:
String
- auto_process_ngs.pipeliner.collect_files(dirn, pattern)
Return names of files in a directory which match a glob pattern
- Parameters:
dirn (str) – path to a directory containing the files
pattern (str) – a glob pattern to match
- Returns:
list of matching files
- Return type:
- auto_process_ngs.pipeliner.make_conda_env(conda, env_name, package_list, env_dir=None, channels=None, timeout=600)
Create a conda environment
If the named environment doesn’t already exist in the environments directory then a new environment will be created with the specified packages.
- Parameters:
conda (str) – path to conda executable env_name (str): name for the environment to create package_list (list): list of packages to install
env_dir (str) – path to conda environments directory (defaults to environments directory belonging to the supplied conda installation)
channels (list) – optional list of channel names to use with conda operations (overrides defaults)
timeout (int) – number of seconds to wait to acquire lock on environments directory before giving up (default: 600)
- Returns:
- path to Conda environment (or None if the
environment couldn’t be created).
- Return type:
String
- auto_process_ngs.pipeliner.report_text(s, head=None, tail=None, prefix=None, reportf=None)
Output text with optional topping and tailing
Outputs the supplied string of text line-by-line via the specified reporting function (defaults to the built-in ‘print’ function) with options to limit the number of leading and/or trailing lines.
Where lines are omitted, an additional line is output reporting the number of lines that were skipped.
- Parameters:
s (str) – text to report
head (int) – number of leading lines to report
tail (int) – number of trailing lines to report
prefix (str) – optional string to prefix to each reported line
reportf (func) – function to call to report each line of text (defaults to ‘print’)
- auto_process_ngs.pipeliner.resolve_parameter(p)
Resolve the value of an arbitrary parameter
The supplied “parameter” can be any object; if it has a ‘value’ property then the resolved value will whatever this returns, otherwise the supplied object is returned as-is.
- Parameters:
p (object) – parameter to resolve
- Returns:
resolved parameter value.
- Return type:
Object
- auto_process_ngs.pipeliner.sanitize_name(s)
Convert string to lowercase and replace special characters
- Parameters:
s (str) – string to sanitize
- Returns:
sanitized string.
- Return type:
String