auto_process_ngs.pipeliner

Module providing utility classes and functions for building simple ‘pipelines’ of tasks.

The core classes are:

  • Pipeline: class for building and executing pipelines

  • PipelineTask: class for defining pipeline tasks

  • PipelineFunctionTask: subclass of PipelineTask which enables Python functions to be run as external processes

Additional supporting classes:

  • PipelineCommand: class for defining commands that can be used in tasks

  • PipelineCommandWrapper: shortcut alternative to PipelineCommand

  • PipelineScriptWrapper: subclass of PipelineCommand for scripts

  • PipelineParam: class for passing arbitrary values between tasks

  • FileCollector: returning collections of files based on glob patterns

  • FunctionParam: PipelineParameter-like deferred function evaluation

  • ListParam: PipelineParameter-like implementation of list-like behaviour

  • PathJoinParam: PipelineParameter-like dynamic file path joiner

  • PathExistsParam: PipelineParameter-like dynamic file existence checker

The following exception classes are defined:

  • PipelineError: general pipeline-related exception

There are some underlying classes and functions that are intended for internal use:

  • BaseParam: base class for parameter-like classes

  • Capturing: capture stdout and stderr from a Python function

  • Dispatcher: run a Python function as an external process

  • sanitize_name: clean up task and command names for use in pipeline

  • collect_files: collect files based on glob patterns

  • resolve_parameter: get the value of arbitrary parameter or object

Overview

For the purposes of the pipeliner module, a ‘pipeline’ consists of a set of ‘tasks’: a task can be independent of any other task, or it may depend on one or more tasks being completed before it can start.

Defining tasks: examples

Tasks must be defined by subclassing the PipelineTask class and implementing the following methods:

  • init: used to declare any input parameters for the task

  • setup: perform any set up (e.g. creating output directories) or other arbitary actions; typically it also defines any commands that will be sent to the scheduler, however this is optional.

  • finish: perform final actions after setup and any defined commands have completed (nb finish is optional)

  • output: return any outputs from the task once it has completed.

For example: let’s define a task which runs the fastqc program on a collection of Fastq files one at a time:

class RunFastqc(PipelineTask):
    def init(self,fastqs,out_dir):
        self.add_output('out_files',list())
    def setup(self):
        if not os.path.exists(self.args.out_dir):
            os.mkdir(self.args.out_dir)
        for fq in self.args.fastqs:
            self.add_cmd("Run FastQC",
                         Command("fastqc",
                                 "-o",self.args.out_dir,
                                 fq))
    def finish(self):
        for fq in self.args.fastqs:
            if fq.endswith(".gz"):
                fq = os.path.splitext(fq)[0]
            out_file = os.path.join(
                self.args.out_dir,
                os.path.splitext(
                    os.path.basename(fq))[0]+"_fastqc.html")
            if not os.path.exists(out_file):
                self.fail(message="Missing output file: %s" % out_file)
            else:
                self.output.out_files.append(out_file)

The key features are:

  1. The argument list of the init method defines an arbitrary set of parameters which are made available to the other methods via the self.args object.

    The init method also adds an output called out_files which will be used to store the outputs of the task; it can be accessed via the output method.

  2. The setup method creates the output directory if it doesn’t already exist, and then calls the add_cmd method to add a command for each Fastq file supplied via the fastqs argument (accessed as self.args.fastqs), which will run fastqc on that Fastq.

  3. The command to run fastqc is created by creating a Command instance, which specifies the command and arguments to

    execute.

  4. The commands defined via the add_cmd method are not guaranteed to run sequentially. If there are additional commands that rely on the first set of commands finishing, then either append these to the commands using the shell && notation, or put them into a separate task which should be executed afterwards.

  5. The finish method constructs the names of the expected output Fastqc HTML files, and adds them to the out_files list which was originally initialised within the init method.

  6. The task can explicitly indicate a failure by calling the fail method, as in the finish method above. Raising an exception will also implicitly indicate a failure.

Another example is a task which does a simple-minded read count on a set of Fastq files:

class CountReads(PipelineTask):
    def init(self,fastqs):
        self.add_output('counts',dict())
    def setup(self):
        for fq in self.args.fastqs:
            if os.path.splitext(fq)[1] == ".gz":
               cat = "zcat"
            else:
               cat = "cat"
            self.add_cmd(
                "Count reads",
                Command("echo","-n",fq,"' '","&&",
                        cat,fq,"|","wc","-l"))
    def finish(self):
        for line in self.stdout.split('
‘):
if not line:

continue

fq = line.split()[0] read_count = int(line.split()[1])/4 self.outputs.counts[fq] = read_count

The key features are:

  1. The init method initialises the counts output which will be populated in the finish method, and accessed via the output method.

  2. The standard output from the task is available via the stdout property of the instance.

  3. The finish method is implemented to extract the line count data from standard output and convert this to a read count which is stored against the Fastq name.

A final example is a task which filters out the Fastq files which have non-zero read counts:

class FilterEmptyFastqs(PipelineTask):
    def init(self,read_counts):
        self.add_output('filtered_fastqs',list())
    def setup(self):
        for fq in self.args.read_counts:
            if self.args.read_counts[fq] > 0:
                 self.output.filtered_fastqs.append(fq)

In this case all the processing is performed by the setup method; no commands are defined.

Running scripts from within tasks

It is possible to define scripts to run from within tasks, for example previous the CountReads task might be more cleanly implemented using scripts when adding commands:

class CountReads(PipelineTask):
    ...
    def setup(self):
        for fq in self.args.fastqs:
            if os.path.splitext(fq)[1] == ".gz":
               cat = "zcat"
            else:
               cat = "cat"
            self.add_cmd(
                "Count reads",
                '''
                # Count lines in a FASTQ file
                echo -n {fastq} && {cat} {fastq} | wc -l
                '''.format(fastq=fq,cat=cat))
    ...

Scripts can be multiline and use bash syntax, so they can provide an alternative to coding logic within the setup function.

Defining task outputs

Outputs should be defined within the init method, using the add_output method of the task, for example:

self.add_output('fastqs',dict())

The outputs can be accessed via the task’s output property, which returns an AttributeDictionary object where the keys/attributes are those defined by the add_output calls, for example:

task = FilterEmptyFastqs(counts)
...
filtered_fastqs = task.output.fastqs

Typically the values of the outputs are not known before the task has been run, so the setup and finish methods can be used to populate the outputs when the task completes (as in the FilterEmptyFastqs example above).

Using task output as input to another task

The key feature of a pipeline is that the outputs from an upstream task can be used as input to one or more downstream tasks.

In this case the objects returned by output are likely to be passed to other tasks before the task has completed. There are a number of implications:

  1. Tasks that receive output from a preceeding task cannot assume that those outputs are ready or complete at the point of initialisation. It’s therefore recommended that tasks don’t attempt to use those outputs in their ‘init’ method - all processing should be deferred until the ‘setup’ method (when preceeding tasks will have completed).

  2. Outputs from tasks should be passed by object references (e.g. via a list or dictionary, which can be updated by the task after being passed, via specialised classes such as FileCollector or PipelineParam - see sections below).

As an example, consider the following task which reverses the order of a list of items:

class ReverseList(PipelineTask):
    def init(self,items):
        self.add_output('reversed_list',list())
    def setup(self):
        # Generate the reversed list
        for item in self.args.items[::-1]:
            self.output.reversed_list.append(item)

This might be used as follows:

reverse = ReverseList("Reverse order of list",[1,2,3])

Subsequently reverse.output.reversed_list will return the reference to the output list, which can then be passed to another task. The list will be populated when the reverse task runs, at which point the output will be available to the following tasks.

Task working directory isolation

By default each task is run in its own specially created directory, to isolate it from other tasks (note that jobs run by the same task will share this directory).

Isolation of task working directories can be disabled for all tasks within a pipeline at runtime, by setting the isolate_tasks parameter of the pipeline’s run method to False.

Alternatively it can be disabled for individual task instances by explicitly setting the make_task_working_dir parameter for that task instance to False when it is created (this will override the setting specified within the pipeline at runtime).

Specialised task input/output classes

There are a number of specialised classes which can be used to pass data from the output of one task into the input of another:

FileCollector

The FileCollector class enables the collection of files matching a glob-type pattern. It behaves as an iterator, with the file collection being deferred until the iteration actually takes place; it can therefore be used as an output for tasks where a set of files are created but the precise number or names of the files are not known a priori.

For example:

class MakeFiles(PipelineTask):
    def init(self,d,filenames):
        self.add_output('files',
                        FileCollector(self.args.d,"*"))
    def setup(self):
        # Create a directory and "touch" the files
        os.mkdir(self.args.d)
        for f in self.args.filenames:
            with open(os.path.join(self.args.d,f) as fp:
                fp.write()

PipelineParam

The PipelineParam class offers a way to pass strings or numerical types which would normally be immutable between tasks.

For example:

import string
from random import choice

class RandomString(PipelineTask):
    def init(self,n):
        self.add_output('string',PipelineParam())
    def setup(self):
        # Create a random string of length 'n' characters
        allchar = string.ascii_letters + string.punctuation + string.digits
        s = "".join([choice(allchar) for x in range(self.args.n)])
        # Assign the string to the output
        self.output.string.set(s)

If passed as input to a subsequent task, the PipelineParam instance will behave as a static value when accessed via self.args....

FunctionParam

The FunctionParam class allows function invocations to be passed to tasks as PipelineParam-like objects, so that the function evaluation is only performed when the task actually consumes the object.

The function can be a lambda function, or a full function defined elsewhere. For example:

is_dir = FunctionParam(lambda d: os.path.isdir(d),out_dir)

or (more concisely):

out_dir_exists = FunctionParam(os.path.isdir,out_dir)

Arguments and keywords supplied to the FunctionParam class are passed to the function for evaluation only when the object’s value method is invoked (with any parameter-like arguments being converted to their values immediately prior to evaluation).

This can be useful when building a pipeline where functions can’t be evaluated until the pipeline is executed, and as an alternative to creating a full PipelineFunctionTask to wrap a function.

ListParam

The ListParam class a parameter-like alternative to using a standard Python list to pass lists between tasks in a pipeline; it is most useful when the list itself contains parameter-like items, in which case these items are also evaluated before the ListParam is passed to a task.

PathJoinParam

The PathJoinParam class provides a parameter-like alternative to the os.path.join function; it is useful when building pipelines where it is not possible to construct the final paths for files or directories until the pipeline is actually executed (for example if some path components are supplied as parameters).

PathExistsParam

The PathExistsParam class provides a parameter-like alternative to the os.path.exists function; it is useful when building pipelines where it is not possible to check the existence of some paths until the pipeline is actually executed (for example because files or directories being checked are created during pipeline execution).

Running Python functions as tasks

The PipelineFunctionTask class enables a Python function to be run as an external process, for example reimplementing the earlier CountReads example:

from bcftbx.FASTQFile import nreads
class CountReads(PipelineFunctionTask):
    def init(self,fastqs):
        self.add_output('counts',dict())
    def setup(self):
        self.add_call("Count reads in Fastq",
                      self.count_reads,
                      self.args.fastqs)
    def count_reads(self,fastqs):
        # Count reads in each Fastq
        counts = dict()
        for fq in fastqs:
            counts[fq] = nreads(fq)
        return counts
    def finish(self):
        for fq in self.result():
            for fq in result[fq]:
                self.output.counts[fq] = result[fq]

The main differences between this and the PipelineTask class are:

  1. The class includes a method (in this case make_files) which implements the Python function to be executed.

  2. The add_call method is used in the setup method to define calls to the function to run (analogous to the add_cmd method for normal tasks). add_call can be used multiple times, e.g. to break a task into several separate processes (again analogous to add_cmd).

  3. The return values from the invoked function are collected and made available via the result method. This returns a list of results (one for each add_call was used). The finish method contains code to post-process these results.

The advantage of this approach is that intensive operations (e.g. processing large numbers of files) previously performed by Python functions can be farmed out to external processes without having to be wrapped in new executable programs.

Note

The FunctionParam class could also be considered as an alternative to PipelineFunctionTasks for light-weight function calls during pipeline execution.

Building and running a pipeline

An empty pipeline is created by making a Pipeline instance:

ppl = Pipeline()

Specific task instances are then created from PipelineTask subclasses, for example using the tasks defined previously:

read_counts = CountReads("Count the reads",fastqs)
filter_empty_fastqs = FilterEmptyFastqs("Filter empty Fastqs",
                                        read_counts.output.filtered_fastqs)
run_fastqc = RunFastqc("Run Fastqc",
                       filter_empty_fastqs.output.filtered_fastqs)

Note that when instantiating a task, it must be given a name; this can be any arbitrary text and is intended to help the end user distinguish between different task instances in the pipeline.

The tasks are then added to the pipeline using the add_task method:

ppl.add_task(read_counts)
ppl.add_task(filter_empty_fastqs,
             requires=(read_counts,))
ppl.add_task(run_fastqc,
             requires=(filter_empty_fastqs,))

The pipeline is then run using the run method:

ppl.run()

Notes:

  1. Tasks will only be executed once any tasks they depend on have completed successfully; these are specified via the requires argument of the add_task method (in which case it must be a list or tuple of task instances), and/or via the requires method of a task instance.

  2. Tasks that fail (i.e. complete with non-zero exit status) will cause the pipeline to halt at that point.

  3. The run method blocks and returns the exit status of the pipeline execution.

Defining relationships between tasks

A task in a pipeline may depend on one or more other tasks in the pipeline to complete before it can be run; these are referred to as “requirements” of the task.

Requirements can be specified in different ways:

  1. When a task is added to a pipeline via the add_task method then a list of required tasks can also be specified via the requires argument;

  2. Requirements can be added directly to a task using its requires method.

  3. A task can be made the requirement of other tasks using its required_by method.

Note that these two approaches are not exclusive, and can be used together on the same task to specify requirements in a pipeline.

Additionally, requirements are automatically added implicitly for each input that is a parameter-based output from another task.

Requirement specification can be combined with two properties that can be useful for when adding tasks to an existing pipeline:

  • initial_tasks returns a list of all the tasks in the pipeline that don’t have any requirements; it can be used when inserting tasks at the start of a pipeline, as the inserted task can be made into a requirement of the initial tasks using the idiom task.required_by(*pipeline.initial_tasks), and

  • final_tasks returns a list of the all the tasks in the pipeline which no other tasks require (essentially they will run at the end of the pipeline); it can be used when appending tasks to the end of a pipeline using the idiom task.requires(*pipeline.final_tasks).

Scheduling and running jobs

By default the run method of the Pipeline instance creates a new SimpleScheduler instance internally and uses this to run the commands generated by the tasks in the pipeline.

The run method provides a number of arguments that can be used to configure the scheduler, specifically:

  • max_jobs: this sets the maximum number of concurrent jobs that the scheduler will run (defaults to 1)

  • max_slots: this sets the maximum number of concurrent CPUs (aka “slots”) that the scheduler will allocate work for (defaults to no limit).

  • poll_interval: the time interval that the scheduler will used when checking the status of running jobs (defaults to 5 seconds)

If more control is required over the scheduler then the calling subprogram can create and configure its own SimpleScheduler instance and pass this to the run method instead. Note that it is the responsibility of the subprogram to start and stop the scheduler that it provides.

Specifying job runners for tasks

By default when a pipeline is executed then all jobs within its tasks will be run using the default job runner supplied via the default_runner argument of the pipeline’s run method.

For example:

ppl.run(default_runner=SpecialJobRunner())

(If no default is supplied then a SimpleJobRunner is used as the default runner.)

Typically it is desirable to be able to exercise more granular control over the job runners used within the pipeline. This is possible via the runner keyword of the add_task method of the Pipeline class, which allows a job runner to be specified which will then used when jobs in that task are executed.

For example:

ppl.add_task(my_task,runner=SimpleJobRunner())

The Pipeline class also allows parameterisation of job runners when building a pipeline. Placeholders for job runners can be defined using the add_runner method, and accessed via the runners property to be associated with tasks.

For example:

# Define runner
ppl.add_runner('my_runner')
...
# Associate runner with task
ppl.add_task(my_task,runner=ppl.runners['my_runner'])

Job runners can then be set explicitly when the pipeline is executed, using the runners argument of the run method to supply a mapping of runner names to job runner instances.

For example:

ppl.run(runners={ 'my_runner': SpecialJobRunner() })

Any runner names that don’t have associated job runner instances will use the default runner defined via the default_runner argument.

Dynamically setting number of CPUs/threads via job runners

When job runners are created they can have a maximum number of available CPUs (aka “slots”) associated with them.

For SimpleJobRunner``s this has to be set explicitly via the ``nslots argument, for example:

runner =  SimpleJobRunner(nslots=8)

By default only a single slot is allocated. (For GEJobRunners the number of slots is set implicitly.)

The number of slots can then be accessed at runtime, so that jobs run within a task use the appropriate number of CPUs dynamically, by using the runner_nslots method.

For standard PipelineTask classes, this should be done when constructing commands within the setup method. For example: bowtie2 takes a --threads option which tells the program how many threads it should use. A minimal task to run bowtie2 with dynamically assigned number of threads might look like:

class RunBowtie2(PipelineTask):
     def init(self,fastq,index_basename,sam_out):
         pass
     def setup(self):
         self.add_cmd("Run bowtie",
                      Command("bowtie2",
                              "-x",self.args.index_basename,
                              "-U",self.args.fastq,
                              "-S",self.args.sam_out,
                              "--threads",self.runner_nslots)

Note

When using dynamic CPU assignment with SimpleJobRunners, it may also be worth considering using the max_slots parameter when running the pipeline.

Dealing with stdout and stderr from tasks

The stdout and stderr from tasks which run external commands can be accessed via the stdout and stderr properties respectively of the task instance once it has completed.

Where multiple jobs were run by the task, the stdout from all jobs are concatenated and returned via this property (if stderr was not redirected to stdout then this will also be concatenated across all jobs).

The stdout for each job is topped and tailed with a standard set of comment lines output from the wrapper scripts, of the form:

#### COMMAND Echo text
#### HOSTNAME popov
#### USER pjb
#### START Thu Aug 17 08:38:14 BST 2017
...
...Job-specific output...
...
#### END Thu Aug 17 08:38:14 BST 2017
#### EXIT_CODE 0

When parsing the stdout it is recommended to check for these lines using e.g. line.startswith("#### ").

Note

Currently the stderr for each job is not enclosed by standard comment lines.

Handling failed tasks in pipelines

If a task in a pipeline fails (that is, completes with a non-zero exit code) then the pipeline is considered to have failed. In this case the pipeline can use one of a number of strategies to handle execution of the remaining tasks:

  • Pipeline execution halts immediately and all running tasks are terminated (‘immediate’ mode, the default)

  • Pipeline execution continues but all tasks which depend on the failed tasks are removed and not executed (‘deferred’ mode)

The strategy can be set explicitly at runtime by setting the exit_on_failure argument of the pipeline run method to one of the values defined in the PipelineFailure class.

For example:

from pipeliner import PipelineFailure
...
# Define pipeline
...
# Run pipeline in 'deferred' mode
ppl.run(exit_on_failure=PipelineFailure.DEFERRED)

Note that regardless of how the failures are handled the pipeline will always return exit code 1 when one or more tasks fail.

Executing pipeline commands in batches

By default when the pipeline executes the commands generated by a task, each command is sent to the scheduler as a single job.

It is also possible to request that the pipeline executes commands in batches, by specifying either a non-zero size for the batch_size option of the run method, or by specifying a non-zero batch_limit.

If batch_size is set then commands are grouped together into batches of this size, and each batch is sent to the scheduler as a single job; if batch_limit is set then the batch size is set automatically so that the number of batches don’t exceed the specified limit.

Within a batch the commands are executed sequentially, and if one command fails then all subsequent commands in the batch won’t run.

Batch mode can also be requested on a per-task basis, by explicitly specifying batch_size as keyword when adding the task to the pipeline. For example:

ppl = Pipeline()
...
ppl.add_task(my_task,batch_size=5)

This will override any batch size set globally when the pipeline is run.

Setting pipeline parameters at execution time

When building pipelines, it is sometimes necessary or desirable to supply a parameter to a task where the value of that parameter isn’t known until execution time (via the run method).

For example, a task in the pipeline might need to know the number of cores or the location of a temporary directory to be used, which can only be set at execution time.

To handle these situations, it possible to define arbitrary parameters within the Pipeline class at build time which are passed to tasks as placeholders, and then set the values of these parameters at execution time.

The add_param method is used to define a parameter, for example:

ppl = Pipeline()
ppl.add_param('ncores',value=1,type=int)
ppl.add_param('tmpdir')

This creates a new PipelineParam instance which is associated with the supplied name.

The parameters can be accessed via the pipeline’s params property, and passed as input into tasks, for example:

task = ExampleTask("This is an example",
                   ncores=ppl.params.ncores,
                   tmpdir=ppl.params.tmpdir)
ppl.add_task(task)

The runtime values of parameters are then passed via the params argument of the pipeline’s run invocation:

temporary_dir = tempfile.mkdtemp()
ppl.run(params={ 'ncores': 8,
                 'tmpdir': temporary_dir, })

Built-in parameters

In addition to the custom parameters defined using the add_param method and outlined in the previous section, a number of ‘built-in’ parameters are also available as properties of the Pipeline instance, for use when building a pipeline.

Specifically these are:

  • WORKING_DIR: the working directory used by the pipeline

  • BATCH_SIZE: the batch size to be used when running jobs within pipeline tasks

  • BATCH_LIMIT: the maximum number of batches of jobs

  • VERBOSE: whether the pipeline is running in ‘verbose’ mode

These can be used in the same way as the custom parameters when setting up tasks, for example:

task = ExampleTask("This is an example",
                   ncores=ppl.params.ncores,
                   tmpdir=ppl.params.WORKING_DIR)

The values will be set when the pipeline’s run method is invoked.

Defining execution environment for a task: runners, modules & conda

It is possible to define the execution environment on a per-task basis within a pipeline, by defining job runners, environment modules and conda dependencies.

Runners and environments can be declared in a parameterised fashion when a pipline is created, using the add_runner and add_envmodules methods respectively of the Pipeline class.

For example:

ppl = Pipeline()
ppl.add_runner('4_cpus')
ppl.add_envmodules('myenv')

This defines a runner called 4_cpus and an environment called myenv.

The runners and environments are accessed via the runners and envmodules properties of the Pipeline instance, and can be associated with tasks within the pipeline when they are added via the add_task method, using the runner and envmodules keywords respectively).

For example:

ppl.add_task(my_task,runner=ppl.runners['4_cpus'],...)

and

ppl.add_task(my_task,envmodules=ppl.envmodules['myenv'],...)

Actual runners and environments can be assigned when the pipeline is executed, via the runners and envmodules options of the run method of the Pipeline instance - these are mappings of the names defined previously to JobRunner instances, and to lists of environment modules.

For example:

ppl.run(runners={ '4_cpus': GEJobRunner('-pe smp.pe 4'), },
        envmodules={ 'myenv': 'apps/trimmomatic/0.38', },...)

If a runner is not explicitly set for a task then the pipeline’s default runner is used for that task; this defaults to a SimpleJobRunner instance but can be set explicitly via the default_runner argument of the Pipeline instance’s run method.

Execution environments can also be defined with conda packages. The packages and versions required by a task are declared in a task’s init method with calls to the conda method, for example:

class RunFastqc(PipelineTask):
    def init(self,fastq,out_dir):
        self.conda("fastqc=0.11.3")
        ...

If conda dependency resolution is enabled when the pipeline is executed then these declarations will be used to generate conda environments that are activated when the tasks run (otherwise they are ignored) (see the section “Enabling conda to create task environments automatically” for details).

Note

By default the PYTHONNOUSERSITE environment variable is set in the execution environment, and any directories matching the path ${HOME}/.local/lib/python* are removed from the PYTHONPATH.

Together this means that Python programs that are run within that environment will ignore any packages installed in the user site-packages directory (which can otherwise cause issues with conda dependency resolution).

Defining outputs from a pipeline

It is possible to define outputs for a Pipeline instance in the same way that outputs can be defined for individual tasks.

The add_output method of the Pipeline class allows an arbitrary output to be defined, for example:

ppl = Pipeline()
...
ppl.add_output('final_result',result)
ppl.run()

This can be accessed via the pipeline’s output property:

::

print(“The result is ‘%s’” % ppl.output.result)

It is possible that pipeline outputs are defined as PipelineParam instances (for example, if a pipeline output is taken from an output from one of its constituent tasks). By default, on pipeline completion the outputs are “finalized” by substituting the PipelineParam``s for their actual values. To prevent this behaviour, set the ``finalize_outputs argument of the pipeline’s run method to False. For example:

ppl = Pipeline()
ppl.add_output('final_result',PipelineParam())
...
ppl.run(finalize_outputs=False)

It is recommended that outputs are defined as PipelineParam instances, to take advantage of the implicit task requirement gathering mechanism.

Enabling conda to create task environments automatically

The conda package manager can be used within Pipeline``s to automatically create run-time environments for any tasks which declare ``conda dependencies in their init methods.

To enable the use of conda when a pipeline is executed, specify the following arguments when invoking the pipeline’s run method:

  1. enable_conda must be set to True, and

  2. The path to an existing conda installation must be supplied via conda argument.

For example:

ppl = Pipeline()
...
ppl.run(enable_conda=True,conda='/usr/local/miniconda3/bin/conda)

(By default new conda environments are created in a subdirectory of the working directory, but it is possible to explicitly specify a different location via the conda_env_dir of the run method.)

A note on PipelineCommand and PipelineCommandWrapper

In the first version of the pipeliner code, commands within the setup method of PipelineTasks had to be generated using subclasses of the PipelineCommand class.

A simple example to run Fastqc:

class Fastqc(PipelineCommand):
    def init(self,fastq,out_dir):
        self._fastq = fastq
        self._out_dir = out_dir
    def cmd(self):
        return Command("fastqc",
                       "-o",self._out_dir,
                       self._fastq)

which can then be used within a task, for example:

class RunFastqc(PipelineTask):
    ...
    def setup(self):
        ...
        for fq in self.args.fastqs:
            self.add_cmd(Fastqc(fq,self.args.out_dir))

Subsequently a “shortcut” class called PipelineCommandWrapper was introduced, which allowed the generation of commands to be performed within the task class without a PipelineCommand subclass.

In this case the example RunFastqc task becomes:

class RunFastqc(PipelineTask):
    ...
    def setup(self):
        ...
        for fq in self.args.fastqs:
            self.add_cmd(
                PipeLineCommandWrapper(
                    "Run FastQC",
                    "fastqc",
                     "-o",self._out_dir,
                     self._fastq))

This has since been superseded by updates to the add_cmd method which allows a title and Command instance (or a string containing a script) to be supplied instead. In this case the example would become:

class RunFastqc(PipelineTask):
    ...
    def setup(self):
        ...
        for fq in self.args.fastqs:
            self.add_cmd("Run FastQC",
                         Command("fastqc",
                                 "-o",self._out_dir,
                                 self._fastq))

or (if using a script):

class RunFastqc(PipelineTask):
    ...
    def setup(self):
        ...
        for fq in self.args.fastqs:
            self.add_cmd("Run FastQC",
                         '''
                         fastqc -o {out_dir} {fastq}
                         '''.format(out_dir=self._out_dir,
                                    fastq=self._fastq))

All approaches are supported, however the last two are likely to result in cleaner code that is easier to read. PipelineCommand is probably better suited to situations where the same command will be used across multiple distinct tasks. In cases where the command is only used in one task, the last two approaches are recommended.

Advanced pipeline construction: combining pipelines

It possible to build larger pipelines out of smaller ones by using the add_pipeline method of the Pipeline class, which pulls tasks from one pipeline into another along with their dependency relationships.

Optionally dependencies on tasks from the “master” pipeline can be added to the imported pipeline tasks.

Parameters defined in the imported pipeline are also imported and exposed with the same names; but it is also possible to override them with with parameters defined in the master pipeline, or parameters that are task outputs.

There are two specialised methods (append_pipeline and merge_pipeline) which wrap the add_pipeline method:

  • append_pipeline takes all the tasks from one pipeline and adds them to the end of another, so that the appended tasks only run after the original tasks have completed;

  • merge_pipeline takes all the tasks from one pipeline and adds them into another, without requiring that they wait until the original tasks have finished.

Appending can be used for building a pipeline out of distinct ‘sections’ of sub-pipelines; merging can be useful for running multiple pipelines in parallel.

class auto_process_ngs.pipeliner.BaseParam

Provide base class for PipelineParam-type classes

Implements core functionality that should be shared across all parameter-like classes, including assigning a UUID and enabling a task ID to be associated with the parameter.

Provides the following attributes:

  • uuid

  • associated_task_id

and the following methods:

  • associate_task

associate_task(task)

Associate a task with the parameter

Parameters:

task (PipelineTask) – a task object to associate with the parameter

property associated_task_id

Return the task ID of the associated task (or None)

property uuid

Return the unique identifier (UUID) of the parameter

class auto_process_ngs.pipeliner.Capturing

Capture stdout and stderr from a function call

Based on code from http://stackoverflow.com/a/16571630/579925 modified to handle both stdout and stderr (original version only handled stdout)

Usage:

>>> with Capturing() as output:
...     print("Hello!")
>>> for line in output.stdout:
...     print("Line: %s" % line)
>>> for line in output.stderr:
...     print("Err: %s" % line)
class auto_process_ngs.pipeliner.Dispatcher(working_dir=None, cleanup=True)

Class to invoke Python functions in external processes

Enables a Python function to be run in a separate process and collect the results.

Example usage:

>>> d = Dispatcher()
>>> cmd = d.dispatch_function_cmd(bcftbx.utils.list_dirs,os.get_cwd())
>>> cmd.run_subprocess()
>>> result = d.get_result()

The Dispatcher works by pickling the function, arguments and keywords to files, and then creating a command which can be run as an external process; this unpickles the function etc, executes it, and makes the result available to the dispatcher on successful completion.

If the invoked function raises an exception then the result will be returned as None, and the exception will be available from the get_exception method.

For example:

>>> if d.get_exception():
>>>     print("Success: %s" % d.get_result())
>>> else:
>>>     print("Failure: %s" % d.get_exception())

Currently uses cloudpickle as the pickling module: https://pypi.org/project/cloudpickle/

Parameters:
  • working_dir (str) – optional, explicitly specify the directory where pickled files and other intermediates will be stored

  • cleanup (bool) – if True then remove the working directory on exit

dispatch_function_cmd(func, *args, **kwds)

Generate a command to execute the function externally

Parameters:
  • func (function) – function to be executed

  • args (list) – arguments to invoke the function with

  • kwds (mapping) – keyword arguments to invoke the function with

Returns:

a Command instance that can be used to

execute the function.

Return type:

Command

execute(pkl_func_file, pkl_args_file, pkl_kwds_file, pkl_result_file=None)

Internal: execute the function

Parameters:
  • pkl_func_file (str) – path to the file with the pickled version of the function to be invoked

  • pkl_args_file (str) – path to the file with the pickled version of the function arguments

  • pkl_args_file – path to the file with the pickled version of the keyword arguments

  • pkl_result_file (str) – optional path to the file where the pickled version of the function result will be written

get_exception()

Return the exception from the function invocation

Returns:

the exception returned by the function, or

None if exception was raised.

Return type:

Exception

get_result()

Return the result from the function invocation

Returns:

the object returned by the function, or None

if no result was found.

Return type:

Object

property working_dir

Return the working directory path

class auto_process_ngs.pipeliner.FileCollector(dirn, pattern)

Class to return set of files based on glob pattern

Parameters:
  • dirn (str) – directory to search

  • pattern (str) – glob pattern to match files against

Yields:

String – path of matching file.

class auto_process_ngs.pipeliner.FunctionParam(f, *args, **kws)

Class for deferred function evaluation as pipeline parameter

This class wraps a function with a set of parameters; the function evaluation is deferred until the ‘value’ property is invoked.

Any parameters which are PipelineParam-like instances will be replaced with their values before being passed to the function.

property value

Return value from evaluated function

class auto_process_ngs.pipeliner.ListParam(iterable=None)

Implement list-like behaviour as pipeline parameter

This class implements the pipeline parameter equivalent to the Python ‘list’ class. It supports append and extend methods, and the len function will return the number of elements in the list.

The value property returns a Python list, with any pipeline parameter-like objects in the original list replaced with their values.

It is recommended that ListParam instances should be used in pipelines when passing lists of parameters between tasks.

class auto_process_ngs.pipeliner.PathExistsParam(p)

Class for checking file/directory existance as pipeline parameter

This class implements the pipeline parameter equivalent of the os.path.exists function, taking a path on instantiation (which can be a string or PipelineParam-like object) and returning a boolean value indicating whether the path is an existing file system object via the value property.

Example usage:

>>> exists = PathExistsParam("/path/to/file.txt")
>>> exists.value
True
>>> f = PipelineParam(value="/path/to/file.txt")
>>> exists = PathExistsParam(f)
>>> exists.value
True
>>> f.set("/path/to/missing.txt")
>>> exists.value
False

Note that this class doesn’t implement a set method (unlike the standard PipelineParam class) so the path elements cannot be changed after initialisation.

class auto_process_ngs.pipeliner.PathJoinParam(*p)

Class for joining file paths as pipeline parameter

This class implements the pipeline parameter equivalent of the os.path.join function, taking a set of path elements on instantiation (which can be strings or PipelineParam-like objects) and returning the joined path elements on evaluation via the value property.

Example usage:

>>> pth = PathJoinParam("/path","to","file.txt")
>>> pth.value
"/path/to/file.txt"
>>> base_dir = PipelineParam(value="/path/to/base")
>>> pth = PathJoinParam(base_dir,"file.txt")
>>> pth.value
"/path/to/base/file.txt"
>>> base_dir.set("/path/to/new/base")
>>> pth.value
"/path/to/new/base/file.txt"

Note that this class doesn’t implement a set method (unlike the standard PipelineParam class) so the path elements cannot be changed after initialisation.

class auto_process_ngs.pipeliner.Pipeline(name='PIPELINE')

Class to define and run a ‘pipeline’ of ‘tasks’

A pipeline consists of a set of tasks (defined by instantiating subclasses of PipelineTask) with simple dependency relationships (i.e. a task will depend on none, one or more other tasks in the pipeline to complete before it can be executed).

Example usage:

>> p = Pipeline() >> t1 = p.add_task(Task1()) >> t2 = p.add_task(Task2(),requires=(t1,)) >> … >> p.run()

Tasks will only run when all requirements have completed (or will run immediately if they don’t have any requirements).

Parameters:

name (str) – optional name for the pipeline

add_envmodules(name)

Define a new environment defined by modules

Creates a new PipelineParam instance associated with the supplied environment name.

Runner instances can be accessed and set via the envmodules property of the pipeline, for example:

To access:

>>> env_modules = ppl.envmodules['my_env'].value

To set:

>>> ppl.envmodules['my_env'].set("")
Parameters:

name (str) – name for the new runner

add_output(name, value)

Add an output to the pipeline

Parameters:
  • name (str) – name for the output

  • value (object) – associated object

add_param(name, value=None, type=None)

Define a new pipeline parameter

Creates a new PipelineParam instance associated with the supplied name.

Parameters can be accessed and set via the params property of the pipeline.

Parameters:
  • name (str) – name for the new parameter

  • value (object) – optional, initial value to assign to the instance

  • type (function) – optional, function used to convert the stored value when fetched via the value property

add_pipeline(pipeline, params=None, requires=None)

Import tasks from another pipeline

Adds the tasks from the supplied pipeline instance into this pipeline.

Dependency relationships defined between tasks in the imported pipeline are preserved on import. The ‘requires’ keyword can be used to define new dependencies between the initial tasks in the imported pipeline and those in the destination pipeline.

By default parameters defined in the added pipeline will be imported and exposed with the same names; the ‘params’ keyword can be used to replace them with arbitrary parameters (e.g. parameters defined in the master pipeline, outputs from tasks etc).

Parameters:
  • pipeline (Pipeline) – pipeline instance with tasks to be added to this pipeline

  • params (mapping) – a dictionary or mapping where keys are parameter names in the imported pipeline and values are PipelineParam instances that they will be replaced by

  • requires (list) – optional list of tasks that the added pipeline will depend on

add_runner(name)

Define a new runner within the pipeline

Creates a new PipelineParam instance associated with the supplied runner name.

Runner instances can be accessed and set via the runners property of the pipeline, for example:

To access:

>>> runner = ppl.runners['my_runner'].value

To set:

>>> ppl.runners['my_runner'].set(SimpleJobRunner())
Parameters:

name (str) – name for the new runner

add_task(task, requires=(), **kws)

Add a task to the pipeline

Parameters:
  • task (PipelineTask) – task instance to add to the pipeline

  • requires (List) – list or tuple of task instances which need to complete before this task will start

  • kws (Dictionary) – a dictionary of keyword-value pairs which will be passed to the task at run time (see the run method of PipelineTask for valid options)

append_pipeline(pipeline)

Append tasks from another pipeline

Adds the tasks from the supplied pipeline instance into this pipeline, ensuring that the appended tasks depend on the original tasks completing.

Dependencies which were already defined in the pipeline being appended are preserved when they are added to the first.

Parameters:

pipeline (Pipeline) – pipeline instance with tasks to be appended

property envmodules

Access the modules environments defined for the pipeline

Returns the dictionary mapping modules environment names to PipelineParam instances that store the module lists; so to get the list of modules associated with an environment name do e.g.

>>> modules = ppl.envmodules['my_env'].value
property final_tasks

Fetch a list of ‘final tasks’ for the pipeline

Returns:

list of task instances from the pipeline

which don’t have any dependent tasks (i.e. the tasks that will run last)

Return type:

List

get_dependent_tasks(task_id)

Return task ids that depend on supplied task id

Returns:

list of IDs for dependent tasks.

Return type:

List

get_task(task_id)

Return information on a task

Parameters:

task_id (str) – unique identifier for a task

Returns:

tuple of (task,requirements,kws)

where ‘task’ is a PipelineTask instance, requirements is a list of PipelineTask instances that the task depends on, and kws is a keyword mapping

Return type:

Tuple

property initial_tasks

Fetch a list of ‘initial tasks’ for the pipeline

Returns:

list of task instances from the pipeline

which don’t depend on any other tasks (i.e. the tasks that will run first)

Return type:

List

merge_pipeline(pipeline)

Add tasks from another pipeline

Adds the tasks from the supplied pipeline instance into this pipeline, without requiring that the appended tasks depend on the original tasks.

Dependencies which were already defined in the pipeline being appended are preserved when they are added to the first.

Parameters:

pipeline (Pipeline) – pipeline instance with tasks to be added

property name

Return the name of the pipeline

property output

Return the output object

property params

Access the parameters defined for the pipeline

rank_tasks()

Rank the tasks into order

Returns:

list of ‘ranks’, with each rank

being a list of task ids.

Return type:

List

report(s)

Internal: report messages from the pipeline

Parameters:

s (str) – message to report

run(working_dir=None, tasks_work_dir=None, log_dir=None, scripts_dir=None, log_file=None, sched=None, default_runner=None, max_jobs=1, max_slots=None, poll_interval=5, params=None, runners=None, envmodules=None, batch_size=None, batch_limit=None, verbose=False, enable_conda=False, conda=None, conda_env_dir=None, use_locking=True, isolate_tasks=True, exit_on_failure=0, finalize_outputs=True)

Run the tasks in the pipeline

Parameters:
  • working_dir (str) – optional path to top-level working directory (defaults to the current directory)

  • log_dir (str) – path of directory where log files will be written to

  • scripts_dir (str) – path of directory where script files will be written to

  • tasks_work_dir (str) – path to directory where tasks will be run

  • log_file (str) – path to file to write pipeline log messages to (in addition to stdout)

  • sched (SimpleScheduler) – a scheduler to use for running commands generated by each task

  • default_runner (JobRunner) – optional default job runner to use

  • max_jobs (int) – optional maximum number of concurrent jobs in scheduler (defaults to 1; ignored if a scheduler is provided via ‘sched’ argument)

  • max_slots (int) – optional maximum number of ‘slots’ (i.e. concurrent threads or maximum number of CPUs) available to the scheduler (defaults to no limit)

  • poll_interval (float) – optional polling interval (seconds) to set in scheduler (if scheduler not provided via the ‘sched’ argument), and to use for checking if tasks have completed (defaults to 5s)

  • params (mapping) – a dictionary or mapping which associates parameter names with values

  • runners (mapping) – a dictionary or mapping which associates runner names with job runners

  • envmodules (mapping) – a dictionary or mapping which associates envmodule names with list of environment module names

  • enable_conda (bool) – if set then use conda to resolve dependencies for tasks which declare them (conda is disabled by default)

  • conda (str) – path to conda executable

  • conda_env_dir (str) – path to directory to create conda environments in

  • batch_size (int) – if set then run commands in each task in batches, with each batch running this many commands at a time (default is to run one command per job)

  • batch_limit (int) – if set then run commands in batches, with the batch size automatically set so that the number of batches doesn’t exceed this limit; ignored if batch_size is explicitly set (default is not to limit the number of batches)

  • verbose (bool) – if True then report additional information for diagnostics

  • use_locking (book) – if True then lock invocations of task methods

  • isolate_tasks (bool) – if True then automatically create dedicated working directories for each task

  • exit_on_failure (int) – either IMMEDIATE (any task failures cause immediate termination of of the pipeline; this is the default) or DEFERRED (the pipeline execution continues and only raises an error when all tasks have finished running)

  • finalize_outputs (bool) – if True then convert any pipeline outputs from PipelineParams to an actual value (this is the default)

Returns:

0 for successful completion, 1 if there

was an error.

Return type:

Integer

property runners

Access the runners defined for the pipeline

Returns the dictionary mapping runner names to PipelineParam instances that store the runner instances; so to get the runner associated with a name do e.g.

>>> runner = ppl.runners['my_runner'].value
start_scheduler(runner=None, max_concurrent=1, max_slots=None, poll_interval=5)

Internal: instantiate and start local scheduler

stop_scheduler()

Internal: stop the pipeline scheduler

task_list()

Return a list of task ids

Returns:

list of task IDs in the pipeline.

Return type:

List

terminate()

Internal: terminate a running pipeline

class auto_process_ngs.pipeliner.PipelineCommand(*args, **kws)

Base class for constructing program command lines

This class should be subclassed to implement the ‘init’ and ‘cmd’ methods.

The ‘init’ method should do any preprocessing and caching of arguments to be used in the ‘cmd’ method; the ‘cmd’ method should use these to construct and return a ‘Command’ instance.

cmd()

Build the command

Must be implemented by the subclass and return a Command instance

init()

Initialise and store parameters

Must be implemented by the subclass

make_wrapper_script(scripts_dir=None, shell='/bin/bash', envmodules=None, conda=None, conda_env=None, working_dir=None, batch_number=None, script_uuid=None)

Generate a uniquely-named wrapper script to run the command

Parameters:
  • scripts_dir (str) – path of directory to write the wrapper scripts to

  • shell (str) – shell to use (defaults to ‘/bin/bash’)

  • envmodules (str) – list of environment modules to load

  • conda (str) – path to conda executable

  • conda_env (str) – name or path for conda environment to activate in the script

  • working_dir (str) – explicitly specify the directory the script should be executed in

  • batch_number (int) – for batched commands, the number of the batch that this script corresponds to (optional)

  • uuid (str) – optional ID string; if not supplied then a UUID string will be generated

Returns:

name of the wrapper script.

Return type:

String

name()

Return a “sanitized” version of the class name

quote_spaces()

Indicate whether spaces should be quoted in wrapper script

class auto_process_ngs.pipeliner.PipelineCommandWrapper(name, *args)

Class for constructing program command lines

This class is based on the PipelineCommand class but can be used directly (rather than needing to be subclassed).

For example, to wrap the ‘ls’ command directly:

>>> ls_command = PipelineCommandWrapper("List directory",'ls',dirn)

It is also possible to extend the command line using the ‘add_args’ method, for example:

>>> ls_command = PipelineCommandWrapper("List directory",'ls')
>>> ls.command.add_args(dirn)
add_args(*args)

Add additional arguments to extend the command being built

Parameters:

args (List) – one or more arguments to append to the command

cmd()

Internal: implement the ‘cmd’ method

init(*args)

Internal: dummy init which does nothing

exception auto_process_ngs.pipeliner.PipelineError

Base class for pipeline-specific exceptions

class auto_process_ngs.pipeliner.PipelineFunctionTask(_name, *args, **kws)

Class enabling a Python function to be run as a task

A ‘function task’ enables one or more instances of a Python function or class method to be run concurrently as external programs, and for the return values of the to recovered on task completion.

This class should be subclassed to implement the ‘init’, ‘setup’, ‘finish’ (optionally) and ‘output’ methods.

The ‘add_call’ method can be used within ‘setup’ to add one or more function calls to be invoked.

add_call(name, f, *args, **kwds)

Add a function call to the task

Parameters:
  • name (str) – a user friendly name for the call

  • f (function) – the function or instance method to be invoked in the task

  • args (list) – arguments to be passed to the function when invoked

  • kwds (mapping) – keyword=value mapping to be passed to the function when invoked

finish_task()

Internal: perform actions to finish the task

result()

Return the results of the function invocations

class auto_process_ngs.pipeliner.PipelineParam(value=None, type=None, default=None, name=None)

Class for passing arbitrary values between tasks

The PipelineParam class offers a way to dynamically assign values for types which would otherwise be immutable (e.g. strings).

A new PipelineParam is created using e.g.:

>>> p = PipelineParam()

In this form there will be no initial value, however one can be assigned using the value argument, e.g.:

>>> p = PipelineParam(value="first")

The associated value can be returned using the value property:

>>> p.value
"first"

and updated using the set method, e.g.:

>>> p.set("last")
>>> p.value
"last"

The return type can be explicitly specified on object instantiation using the type argument, for example to force that a string is always returned:

>>> p = PipelineParam(type=str)
>>> p.set(123)
>>> p.value
"123"

If the default function is supplied then this will be used to generate a value if the stored value is None:

>>> p = PipelineParam(default=lambda: "default")
>>> p.value
"default"
>>> p.set("assigned")
>>> p.value
"assigned"

If a name is supplied then this will be stored and can be recovered via the name property:

>>> p = PipelineParam(name="user_name")
>>> p.name
"user_name"

The value of a parameter can be set to another parameter, in which case the value of the first parameter will be taken from the second:

>>> first_param = PipelineParam(value="first")
>>> first_param.value
"first"
>>> second_param = PipelineParam(value="second")
>>> first_param.set(second_param)
>>> first_param.value
"second"

A parameter can also be “replaced” with another parameter using its replace_with method:

>>> p = PipelineParam(value="old")
>>> p.value
"old"
>>> pp = PipelineParam(value="new")
>>> p.replace_with(pp)
>>> p.value
"new"

This feature allows parameters defined in one context to be matched with parameters in another (for example when tasks from one pipeline are imported into another).

Parameters:
  • value (object) – optional, initial value to assign to the instance

  • type (function) – optional, function used to convert the stored value when fetched via the value property

  • default (function) – optional, function which will return a default value if no explicit value is set (i.e. value is None)

  • name (str) – optional, name to associate with the instance

property name

Return the name of the parameter (if supplied)

replace_with(p)

Set a parameter to replace this one with

If a replacement parameter is set then the value will be taken from that parameter (ignoring all settings from this one)

Parameters:

p (PipelineParam) – parameter to be used as a replacement on evaluation

set(newvalue)

Update the value assigned to the instance

Parameters:

newvalue (object) – new value to assign

property value

Return the assigned value

If a default function was specified on instance creation then this will be used to generate the value to return if the stored value is None.

If a type function was also specified on instance creation then this will be used to convert the assigned value before it is returned.

class auto_process_ngs.pipeliner.PipelineScriptWrapper(name, *scripts)

Class for constructing script command lines

This class is based on the PipelineCommand class but can be used directly (rather than needing to be subclassed).

For example, to wrap a bash script directly:

>>> ls_script = PipelineScriptWrapper("List directory",
...                                   "ls {d}".format(d=dirn))

It is also possible to compose a script from multiple blocks, for example:

>>> properties = PipelineScriptWrapper("File properties",
...                                    "export FILEN={f}".format(
...                                         f="Example.fq"),
...                                    "du -h $FILEN",
...                                    "file $FILEN")

in which case the generated script will look like:

{
    export FILEN=Example.fq
} && {
    du -h $FILEN
} && {
    file $FILEN
}
add_block(script)

Append a script block

Parameters:

scripts (str) – script block to append

cmd()

Internal: implement the ‘cmd’ method

init(*args)

Internal: dummy init which does nothing

class auto_process_ngs.pipeliner.PipelineTask(_name, *args, **kws)

Base class defining a ‘task’ to run as part of a pipeline

A ‘task’ wraps one or more external programs which can be run concurrently, and which produces a set of outputs. Individual programs should be wrapped in instances of the ‘PipelineCommand’ class.

This class should be subclassed to implement the ‘init’, ‘setup’, ‘finish’ (optionally) and ‘output’ methods.

The ‘add_cmd’ method can be used within ‘setup’ to add one or more ‘PipelineCommand’ instances.

Parameters:
  • _name (str) – an arbitrary user-friendly name for the task instance

  • args (List) – list of arguments to be supplied to the subclass (must match those defined in the ‘init’ method)

  • kws (Dictionary) – dictionary of keyword-value pairs to be supplied to the subclass (must match those defined in the ‘init’ method)

add_cmd(*args)

Add a PipelineCommand to the task

The arguments can be one of:

  • Single argument: PipelineCommand instance

  • Two arguments: title string and Command instance

  • Two arguments: title string and script (as string)

Parameters:

Variable (see above) –

add_output(name, value)

Add an output to the task

Parameters:
  • name (str) – name for the output

  • value (object) – associated object

property args

Fetch parameters supplied to the instance

Returns:

list of arguments (with any pipeline

parameter instances resolved to their current values).

Return type:

List

property completed

Check if the task has completed

Returns:

True if task has completed, False

if not.

Return type:

Boolean

conda(*reqs)

Specify one or more conda packages for the task

Package requirements can be either unversioned (e.g. ‘multiqc’) or versioned (e.g. ‘multiqc=1.8’).

Parameters:

reqs (list) – one or more Conda package specifiers.

property conda_dependencies

Return a list of conda packages required by the task

conda_dependency_resolution_completed(name, jobs, sched)

Internal: callback method

Invoked when a conda dependency resolution job completes

Parameters:
  • name (str) – name for the callback

  • jobs (list) – list of SchedulerJob instances

  • sched (SimpleScheduler) – scheduler instance

property conda_env_name

Return a name for conda environment based on packages

property exit_code

Get the exit code for completed task

Returns:

exit code, or ‘None’ if task hasn’t completed

Return type:

Integer

fail(exit_code=1, message=None)

Register the task as failing

Intended to be invoked from the subclassed ‘setup’ or ‘finish’ methods, to terminate the task and indicate that it has failed.

NB when using the ‘fail’ method it is recommended that the method that it was invoked from should return immediately afterwards, to avoid any unexpected side effects

Parameters:
  • exit_code (int) – optional, specifies the exit code to return (defaults to 1)

  • message (str) – optional, error message to report to the pipeline user

finish()

Perform actions on task completion

Performs any actions that are required on completion of the task, such as moving or copying data, and setting the values of any output parameters.

Must be implemented by the subclass

finish_task()

Internal: perform actions to finish the task

id()

Get the name of the task within the pipeline

Returns:

a name consisting of a ‘sanitized’ version

of the supplied name appended with a unique id code

Return type:

String

init(*args, **kws)

Initialise the task

Defines the arguments and keywords required by the task and sets up output parameters and any internal variables required by setup and finish.

Must be implemented by the subclass

invoke(f, args=None, kws=None)

Internal: invoke arbitrary method on the task

If the working directory is defined then calling ‘invoke’ changes to this directory before the specified function is called.

Parameters:
  • f (function) – method to invoke (e.g. ‘self.init’)

  • args (list) – arguments to invoke function with

  • kws (dictionary) – keyworded parameters to invoke function with

name()

Get the name of the task

Returns:

the name supplied when the task was created

Return type:

String

njobs()

Get number of jobs associated with the task

Returns the total number of jobs and the number of completed jobs associated with a running task, as a tuple: (njobs,ncompleted)

property output

Return the output object

report(s)

Internal: report messages from the task

Parameters:

s (str) – message to report.

report_diagnostics(s, reportf=None, verbose=True)

Internal: report additional diagnostic information

Reports current and working directories, current directory contents, scripts and script outputs; to be invoked on task failure.

Parameters:
  • s (str) – string describing the reason for the diagnostics being reported

  • reportf (func) – function to use for reporting (defaults to the object’s ‘report’ method)

report_failure(reportf=None, verbose=False)

Internal: report information on task failure

Parameters:

reportf (func) – function to use for reporting (defaults to the object’s ‘report’ method)

required_by(*tasks)

Add this task as a requirement of others

Each specified task will wait for this task to complete before they can run.

Parameters:

tasks (List) – list of PipelineTask objects that will have this task added as a requirement

property required_task_ids

Return the task IDs for tasks required by this task

requires(*tasks)

Add tasks as requirements

Each specified task will be added to the list of tasks that need to complete before this task can run.

Parameters:

tasks (List) – list of PipelineTask objects to be added to this task as requirements

requires_id(task_id)

Add task ID to list of requirements

Parameters:

task_id (str) – UUID of the task to be added as a requirement

resolve_dependencies(enable_conda=False, conda=None, conda_env_dir=None, sched=None, scripts_dir=None, working_dir=None, log_dir=None, timeout=600, verbose=False)

Peform dependency resolution for the task

Parameters:
  • enable_conda (bool) – if True then enable conda dependency resolution

  • conda (str) – path to conda executable

  • conda_env_dir (str) – path to directory holding conda environments

  • sched (SimpleScheduler) –

  • scripts_dir (str) – path to write scripts to

  • log_dir (str) – path to directory to write log files to

  • timeout (int) – number of seconds to wait to acquire lock on environments directory before giving up (default: 600)

  • verbose (bool) – if True then report additional information for diagnostics

property resolving_dependencies

Check if task is currently resolving dependencies

Returns True if dependency resolution is in progress, False otherwise

run(sched, runner=None, envmodules=None, enable_conda=False, conda=None, conda_env_dir=None, working_dir=None, log_dir=None, scripts_dir=None, log_file=None, wait_for=(), asynchronous=True, poll_interval=5, batch_size=None, batch_limit=None, lock_manager=None, verbose=False, make_task_working_dir=False)

Run the task

This method is not normally invoked directly; instead it’s called by the pipeline that the task has been added to.

Parameters:
  • sched (SimpleScheduler) – scheduler to submit jobs to

  • runner (JobRunner) – job runner to use when running jobs via the scheduler

  • envmodules (list) – list of environment modules to load when running jobs in the task

  • enable_conda (bool) – whether to enable conda dependency resolution (default: False)

  • conda (str) – path to conda

  • conda_env_dir (str) – path to top-level directory to create or find conda environments

  • working_dir (str) – path to the working directory to use (defaults to the current working directory)

  • log_dir (str) – path to the directory to write logs to (defaults to the working directory)

  • scripts_dir (str) – path to the directory to write scripts to (defaults to the working directory)

  • log_file (str) – path to file to write task log messages to (in addition to stdout)

  • wait_for (list) – deprecated: list of scheduler jobs to wait for before running jobs from this task

  • asynchronous (bool) – if False then block until the task has completed

  • poll_interval (float) – interval between checks on task completion (in seconds) for non-asynchronous tasks (defaults to 5 seconds)

  • batch_size (int) – if set then run commands in each task in batches, with each batch running this many commands at a time (default is to run one command per job)

  • batch_limit (int) – if set then run commands in batches, with the batch size automatically set so that the number of batches doesn’t exceed this limit; ignored if batch_size is explicitly set (default is not to limit the number of batches)

  • lock_manager (ResourceLock) – inter-task lock manager

  • verbose (bool) – if True then report additional information for diagnostics

  • make_task_working_dir (bool) – if True then create a new task-specific working directory under the top-level working directory

property runner_nslots

Get number of slots (i.e. available CPUs) set by runner

This returns the number of CPUs available to the command as set in the job runner which will be used to run the job.

Returns:

environment variable to get slots from.

Return type:

String

setup()

Set up commands to be performed by the task

Must be implemented by the subclass

property stderr

Get the standard error from the task

Returns:

standard error from the task.

Return type:

String

property stdout

Get the standard output from the task

Returns:

standard output from the task.

Return type:

String

task_completed(name, jobs, sched)

Internal: callback method

This is a callback method which is invoked when scheduled jobs in the task finish

Parameters:
  • name (str) – name for the callback

  • jobs (list) – list of SchedulerJob instances

  • sched (SimpleScheduler) – scheduler instance

terminate()

Internal: terminate the task

property updated

Check if the task has been updated since the last check

Returns:

True if task has been updated, False

if not.

Return type:

Boolean

auto_process_ngs.pipeliner.check_conda_env(conda, env_name, env_dir=None, timeout=600)

Check a conda environment

Verifies whether the named environment exists and can be activated.

Parameters:
  • conda (str) – path to conda executable env_name (str): name for the environment to create package_list (list): list of packages to install

  • env_dir (str) – path to conda environments directory (defaults to environments directory belonging to the supplied conda installation)

  • timeout (int) – number of seconds to wait to acquire lock on environments directory before giving up (default: 600)

Returns:

path to Conda environment (or None if the

environment failed the checks).

Return type:

String

auto_process_ngs.pipeliner.collect_files(dirn, pattern)

Return names of files in a directory which match a glob pattern

Parameters:
  • dirn (str) – path to a directory containing the files

  • pattern (str) – a glob pattern to match

Returns:

list of matching files

Return type:

List

auto_process_ngs.pipeliner.make_conda_env(conda, env_name, package_list, env_dir=None, channels=None, timeout=600)

Create a conda environment

If the named environment doesn’t already exist in the environments directory then a new environment will be created with the specified packages.

Parameters:
  • conda (str) – path to conda executable env_name (str): name for the environment to create package_list (list): list of packages to install

  • env_dir (str) – path to conda environments directory (defaults to environments directory belonging to the supplied conda installation)

  • channels (list) – optional list of channel names to use with conda operations (overrides defaults)

  • timeout (int) – number of seconds to wait to acquire lock on environments directory before giving up (default: 600)

Returns:

path to Conda environment (or None if the

environment couldn’t be created).

Return type:

String

auto_process_ngs.pipeliner.report_text(s, head=None, tail=None, prefix=None, reportf=None)

Output text with optional topping and tailing

Outputs the supplied string of text line-by-line via the specified reporting function (defaults to the built-in ‘print’ function) with options to limit the number of leading and/or trailing lines.

Where lines are omitted, an additional line is output reporting the number of lines that were skipped.

Parameters:
  • s (str) – text to report

  • head (int) – number of leading lines to report

  • tail (int) – number of trailing lines to report

  • prefix (str) – optional string to prefix to each reported line

  • reportf (func) – function to call to report each line of text (defaults to ‘print’)

auto_process_ngs.pipeliner.resolve_parameter(p)

Resolve the value of an arbitrary parameter

The supplied “parameter” can be any object; if it has a ‘value’ property then the resolved value will whatever this returns, otherwise the supplied object is returned as-is.

Parameters:

p (object) – parameter to resolve

Returns:

resolved parameter value.

Return type:

Object

auto_process_ngs.pipeliner.sanitize_name(s)

Convert string to lowercase and replace special characters

Parameters:

s (str) – string to sanitize

Returns:

sanitized string.

Return type:

String