caput.pipeline

Data Analysis and Simulation Pipeline.

A data analysis pipeline is completely specified by a YAML file that specifies both what tasks are to be run and the parameters that go to those tasks. Included in this package are base classes for simplifying the construction of data analysis tasks, as well as the pipeline manager which executes them.

Pipelines are most easily executed using the script in caput_pipeline.py, which ships with caput.

Flow control classes

Task base classes

Examples

Basic Tasks

A pipeline task is a subclass of TaskBase intended to perform some small, modular piece analysis. The developer of the task must specify what input parameters the task expects as well as code to perform the actual processing for the task.

Input parameters are specified by adding class attributes whose values are instances of config.Property. For instance a task definition might begin with

>>> class SpamTask(TaskBase):
...     eggs = config.Property(proptype=str)

This defines a new task named SpamTask with a parameter named eggs, whose type is a string. The class attribute SpamTask.eggs will replaced with an instance attribute when an instance of the task is initialized, with it’s value read from the pipeline configuration YAML file (see next section).

The actual work for the task is specified by over-ridding any of the setup(), next() or finish() methods (__init__() may also be implemented`). These are executed in order, with next() possibly being executed many times. Iteration of next() is halted by raising a PipelineStopIteration. Here is a example of a somewhat trivial but fully implemented task:

>>> class PrintEggs(TaskBase):
...
...     eggs = config.Property(proptype=list)
...
...     def __init__(self):
...         super().__init__()
...         self.i = 0
...
...     def setup(self):
...         print("Setting up PrintEggs.")
...
...     def next(self):
...         if self.i >= len(self.eggs):
...             raise PipelineStopIteration()
...         print("Spam and %s eggs." % self.eggs[self.i])
...         self.i += 1
...
...     def finish(self):
...         print("Finished PrintEggs.")

Any return value of these three pipeline methods can be handled by the pipeline and provided to subsequent tasks. The methods setup() and next() may accept (positional only) arguments which will be received as the outputs of early tasks in a pipeline chain. The following is an example of a pair of tasks that are designed to operate in this manner.

>>> class GetEggs(TaskBase):
...
...     eggs = config.Property(proptype=list)
...
...     def __init__(self):
...         super().__init__()
...         self.i = 0
...
...     def setup(self):
...         print("Setting up GetEggs.")
...
...     def next(self):
...         if self.i >= len(self.eggs):
...             raise PipelineStopIteration()
...         egg = self.eggs[self.i]
...         self.i += 1
...         return egg
...
...     def finish(self):
...         print("Finished GetEggs.")
>>> class CookEggs(TaskBase):
...
...     style = config.Property(proptype=str)
...
...     def setup(self):
...         print("Setting up CookEggs.")
...
...     def next(self, egg):
...         print("Cooking %s %s eggs." % (self.style, egg))
...
...     def finish(self):
...         print("Finished CookEggs.")

Note that CookEggs.next() never raises a PipelineStopIteration. This is because there is no way for the task to internally know how long to iterate. next() will continue to be called as long as there are inputs for next() and will stop iterating when there are none.

Pipeline Configuration

To actually run a task or series of tasks, a YAML pipeline configuration is required. The pipeline configuration has two main functions: to specify the the pipeline (which tasks are run, in which order and how to handle the inputs and outputs of tasks) and to provide parameters to each individual task. Here is an example of a pipeline configuration:

>>> spam_config = '''
... pipeline :
...     tasks:
...         -   type:   PrintEggs
...             params: eggs_params
...
...         -   type:   GetEggs
...             params: eggs_params
...             out:    egg
...
...         -   type:   CookEggs
...             params: cook_params
...             in:     egg
...
... eggs_params:
...     eggs: ['green', 'duck', 'ostrich']
...
... cook_params:
...     style: 'fried'
...
... '''

Here the ‘pipeline’ section contains parameters that pertain to the pipeline as a whole. The most important parameter is tasks, a list of tasks to be executed. Each entry in this list may contain the following keys:

type

(required) The name of the class relative to the global name space. Any required imports will be performed dynamically. Any classes that are not importable (defined interactively) need to be registered in the dictionary pipeline.local_tasks.

params

(required) Key or list of keys referring to sections of the pipeline configuration holding parameters for the task.

out

A ‘pipeline product key’ or list of keys that label any return values from setup(), next() or finish().

requires

A ‘pipeline product key’ or list of keys representing values to be passed as arguments to setup().

in

A ‘pipeline product key’ or list of keys representing values to be passed as arguments to next().

The sections other than ‘pipeline’ in the configuration contain the parameter for the various tasks, as specified be the ‘params’ keys.

Execution Order

When the above pipeline is executed is produces the following output.

>>> local_tasks.update(globals())  # Required for interactive sessions.
>>> Manager.from_yaml_str(spam_config).run()
Setting up PrintEggs.
Setting up GetEggs.
Setting up CookEggs.
Spam and green eggs.
Cooking fried green eggs.
Spam and duck eggs.
Cooking fried duck eggs.
Spam and ostrich eggs.
Cooking fried ostrich eggs.
Finished PrintEggs.
Finished GetEggs.
Finished CookEggs.

The rules for execution order are as follows:

  1. One of the methods setup(), next() or finish(), as appropriate, will be executed from each task, in order.

  2. If the task method is missing its input, as specified by the ‘requires’ or ‘in’ keys, restart at the beginning of the tasks list.

  3. If the input to next() is missing and the task is at the beginning of the list there will be no opportunity to generate this input. Stop iterating next() and proceed to finish().

  4. Once a task has executed finish(), remove it from the list.

  5. Once a method from the last member of the tasks list is executed, restart at the beginning of the list.

If the above rules seem somewhat opaque, consider the following example which illustrates these rules in a pipeline with a slightly more non-trivial flow.

>>> class DoNothing(TaskBase):
...
...     def setup(self):
...         print("Setting up DoNothing.")
...
...     def next(self, input):
...         print("DoNothing next.")
...
...     def finish(self):
...         print("Finished DoNothing.")
>>> local_tasks.update(globals())  # Required for interactive sessions only.
>>> new_spam_config = '''
... pipeline :
...     tasks:
...         -   type:   GetEggs
...             params: eggs_params
...             out:    egg
...
...         -   type:   CookEggs
...             params: cook_params
...             in:     egg
...
...         -   type:   DoNothing
...             params: no_params
...             in:     non_existent_data_product
...
...         -   type:   PrintEggs
...             params: eggs_params
...
... eggs_params:
...     eggs: ['green', 'duck', 'ostrich']
...
... cook_params:
...     style: 'fried'
...
... no_params: {}
... '''

The following would error, because the pipeline config is checked for errors, like an ‘in’ parameter without a corresponding ‘out’:

Manager.from_yaml_str(new_spam_config).run()

But this is what it would produce otherwise:

Setting up GetEggs.
Setting up CookEggs.
Setting up DoNothing.
Setting up PrintEggs.
Cooking fried green eggs.
Cooking fried duck eggs.
Cooking fried ostrich eggs.
Finished GetEggs.
Finished CookEggs.
Finished DoNothing.
Spam and green eggs.
Spam and duck eggs.
Spam and ostrich eggs.
Finished PrintEggs.

Notice that DoNothing.next() is never called, since the pipeline never generates its input, ‘non_existent_data_product’. Once everything before DoNothing has been executed the pipeline notices that there is no opertunity for ‘non_existent_data_product’ to be generated and forces DoNothing to proceed to finish(). This also unblocks PrintEggs allowing it to proceed normally.

Pure Python Pipelines

It is possible to construct and run a pipeline purely within Python, which can be useful for quick prototyping and debugging. This gives direct control over task construction and configuration, and allows injection and inspection of pipeline products.

To add a task to the pipeline you need to: create an instance of it; set any configuration attributes directly (or call read_config() on an appropriate dictionary); and then added to the pipeline using the add_task() to add the instance and specify the queues it connects to.

To inject products into the pipeline, use the Input and supply it an iterator as an argument. Each item will be fed into the pipeline one by one. To take outputs from the pipeline, simply use the Output task. By default this simply saves everything it receives into a list, but it can be given a callback function to apply processing to each argument in turn.

>>> m = Manager()
>>> m.add_task(Input(["platypus", "dinosaur"]), out="key1")
>>> cook = CookEggs()
>>> cook.style = "coddled"
>>> m.add_task(cook, in_="key1")
>>> m.add_task(Output(lambda x: print("I love %s eggs!" % x)), in_="key1")
>>> m.run()
Setting up CookEggs.
Cooking coddled platypus eggs.
I love platypus eggs!
Cooking coddled dinosaur eggs.
I love dinosaur eggs!
Finished CookEggs.

Advanced Tasks

Several subclasses of TaskBase provide advanced functionality for tasks that conform to the most common patterns. This functionality includes: optionally reading inputs from disk, instead of receiving them from the pipeline; optionally writing outputs to disk automatically; and caching the results of a large computation to disk in an intelligent manner (not yet implemented).

Base classes providing this functionality are SingleBase for ‘one shot’ tasks and IterBase for task that need to iterate. There are limited to a single input (‘in’ key) and a single output (‘out’ key). Method process() should be overwritten instead of next(). Optionally, read_input() and write_output() may be over-ridden for maximum functionality. setup() and finish() may be overridden as usual.

In addition SingleH5Base, IterH5Base, provide the read_input() and write_output() methods for the most common formats.

See the documentation for these base classes for more details.

class caput.pipeline.BasicContMixin[source]

Bases: object

Provides IO for BasicCont objects in pipeline tasks.

As a mixin, this must be combined (using multiple inheritance) with a subclass of TaskBase, providing the full task API.

Provides the methods read_input, read_output and write_output for BasicCont data which gets written to HDF5 files.

read_input(filename)[source]

Method for reading hdf5 input.

read_output(filename)[source]

Method for reading hdf5 output (from caches).

static write_output(filename, output, file_format=None, **kwargs)[source]

Method for writing output to disk.

Parameters
  • filename (str) – File name.

  • output (memh5.BasicCont) – Data to be written.

  • file_format (fileformats.FileFormat) – File format to use. Default fileformats.HDF5.

class caput.pipeline.H5IOMixin[source]

Bases: object

Provides hdf5/zarr IO for pipeline tasks.

As a mixin, this must be combined (using multiple inheritance) with a subclass of TaskBase, providing the full task API.

Provides the methods read_input, read_output and write_output for hdf5 data.

static read_input(filename)[source]

Method for reading hdf5 input.

static read_output(filename)[source]

Method for reading hdf5 output (from caches).

static write_output(filename, output, file_format=None, **kwargs)[source]

Method for writing hdf5/zarr output.

Parameters
  • filename (str) – File name

  • output (memh5.Group, zarr.Group or h5py.Group) – output to be written. If this is a h5py.Group (which include hdf5.File objects) the buffer is flushed if filename points to the same file and a copy is made otherwise.

  • file_format (fileformats.Zarr, fileformats.HDF5 or None) – File format to use. If this is not specified, the file format is guessed based on the type of output or the filename. If guessing is not successful, HDF5 is used.

class caput.pipeline.Input(inputs=None)[source]

Bases: caput.pipeline.TaskBase

Pass inputs into the pipeline from outside.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next()[source]

Pop and return the first element of inputs.

class caput.pipeline.IterBase[source]

Bases: caput.pipeline._OneAndOne

Base class for iterating tasks with at most one input and one output.

Tasks inheriting from this class should override process() and optionally setup(), finish(), read_input(), write_output() and cast_input(). They should not override next().

If the value of input_root is anything other than the string “None” then the input will be read (using read_input()) from the file self.input_root + self.file_middles[i] + self.input_ext. If the input is specified both as a filename and as a product key in the pipeline configuration, an error will be raised upon initialization.

If the value of output_root is anything other than the string “None” then the output will be written (using write_output()) to the file self.output_root + self.file_middles[i] + self.output_ext.

iteration

The current iteration of process/next.

Type

int

file_middles

The unique part of each file path.

Type

list of strings

input_root

Pipeline settable parameter giving the first part of the input path. If set to ‘None’ no input is read. Either it is assumed that no input is required or that input is recieved from the pipeline.

Type

string

input_ext

Pipeline settable parameter giving the last part of input path. The full input path is self.input_root + self.file_middles[self.iteration] + self.input_ext.

Type

string

output_root

Pipeline settable parameter giving the first part of the output path. If set to ‘None’ no output is written.

Type

strig

output_ext

Pipeline settable parameter giving the last part of output path. The full output path is self.output_root + self.file_middles[self.iteration] + self.output_ext.

Type

string

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next(input=None)[source]

Should not need to override.

class caput.pipeline.IterH5Base[source]

Bases: caput.pipeline.H5IOMixin, caput.pipeline.IterBase

Base class for iterating over hdf5 input and output.

Inherits from H5IOMixin and IterBase.

class caput.pipeline.Manager(psutil_profiling=False)[source]

Bases: caput.config.Reader

Pipeline manager for setting up and running pipeline tasks.

The manager is in charge of initializing all pipeline tasks, setting them up by providing the appropriate parameters, then executing the methods of the each task in the appropriate order. It also handles intermediate data products and ensuring that the correct products are passed between tasks.

logging

Log levels per module. The key “root” stores the root log level.

Type

Dict(str, str)

multiprocessing

TODO

Type

int

cluster

TODO

Type

dict

tasks

Configuration of pipeline tasks.

Type

list

save_versions

Module names (str). This list together with the version strings from these modules are attached to output metadata. Default: [].

Type

list

save_config

If this is True, the global pipeline configuration is attached to output metadata. Default: True.

Type

bool

psutil_profiling

Use psutil to profile CPU and memory usage. Default False.

Type

bool

add_task(task, requires=None, in_=None, out=None)[source]

Add a task instance to the pipeline.

Parameters
  • task (TaskBase) – A pipeline task instance.

  • requires (list or string) – The names of the task inputs and outputs.

  • in (list or string) – The names of the task inputs and outputs.

  • out (list or string) – The names of the task inputs and outputs.

Raises

caput.config.CaputConfigError – If there was an error in the task configuration.

classmethod from_yaml_file(file_name, lint=False, psutil_profiling=False)[source]

Initialize the pipeline from a YAML configuration file.

Parameters
  • file_name (string) – Path to YAML pipeline configuration file.

  • lint (bool) – Instantiate Manager only to lint config. Disables debug logging.

  • psutil_profiling (bool) – Use psutil to profile CPU and memory usage

Returns

self

Return type

Pipeline object

classmethod from_yaml_str(yaml_doc, lint=False, psutil_profiling=False)[source]

Initialize the pipeline from a YAML configuration string.

Parameters
  • yaml_doc (string) – Yaml configuration document.

  • lint (bool) – Instantiate Manager only to lint config. Disables debug logging.

  • psutil_profiling (bool) – Use psutil to profile CPU and memory usage.

Returns

self

Return type

Pipeline object

run()[source]

Main driver method for the pipeline.

This function initializes all pipeline tasks and runs the pipeline through to completion.

Raises

PipelineRuntimeError – If a task stage returns the wrong number of outputs.

class caput.pipeline.Output(callback=None)[source]

Bases: caput.pipeline.TaskBase

Take outputs from the pipeline and place them in a list.

To apply some processing to pipeline output (i.e. this tasks input), use the callback argument which will get passed the item. The return value of the callback is placed in the outputs attribute. Note that this need not be the input, so if pipeline output should be deleted to save memory you can simply return None.

Parameters

callback (function, optional) – A function which can apply some processing to the pipeline output.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next(in_)[source]

Pop and return the first element of inputs.

exception caput.pipeline.PipelineConfigError(message)[source]

Bases: caput.config.CaputConfigError

Deprecated. Raised when there is an error setting up a pipeline.

exception caput.pipeline.PipelineRuntimeError[source]

Bases: Exception

Raised when there is a pipeline related error at runtime.

exception caput.pipeline.PipelineStopIteration[source]

Bases: Exception

This stops the iteration of next() in pipeline tasks.

Pipeline tasks should raise this excetions in the next() method to stop the iteration of the task and to proceed to finish().

Note that if next() recieves input data as an argument, it is not required to ever raise this exception. The pipeline will proceed to finish() once the input data has run out.

class caput.pipeline.SingleBase[source]

Bases: caput.pipeline._OneAndOne

Base class for non-iterating tasks with at most one input and output.

Inherits from TaskBase.

Tasks inheriting from this class should override process and optionally setup(), finish(), read_input(), write_output() and cast_input(). They should not override next().

If the value of input_root is anything other than the string “None” then the input will be read (using read_input()) from the file self.input_root + self.input_filename. If the input is specified both as a filename and as a product key in the pipeline configuration, an error will be raised upon initialization.

If the value of output_root is anything other than the string “None” then the output will be written (using write_output()) to the file self.output_root + self.output_filename.

input_root

Pipeline settable parameter giving the first part of the input path. If set to ‘None’ no input is read. Either it is assumed that no input is required or that input is recieved from the pipeline.

Type

string

input_filename

Pipeline settable parameter giving the last part of input path. The full input path is self.input_root + self.input_filename.

Type

string

output_root

Pipeline settable parameter giving the first part of the output path. If set to ‘None’ no output is written.

Type

strig

output_filename

Pipeline settable parameter giving the last part of output path. The full output path is self.output_root + self.output_filename.

Type

string

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

next(input=None)[source]

Should not need to override.

class caput.pipeline.SingleH5Base[source]

Bases: caput.pipeline.H5IOMixin, caput.pipeline.SingleBase

Base class for tasks with hdf5 input and output.

Inherits from H5IOMixin and SingleBase.

class caput.pipeline.TaskBase[source]

Bases: caput.config.Reader

Base class for all pipeline tasks.

All pipeline tasks should inherit from this class, with functionality and analysis added by over-riding __init__, setup, next and/or finish.

In addition, input parameters may be specified by adding class attributes which are instances of config.Property. These will then be read from the pipeline yaml file when the pipeline is initialized. The class attributes will be overridden with instance attributes with the same name but with the values specified in the pipeline file.

Initialize pipeline task.

May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.

property cacheable

Override to return True if caching results is implemented.

No caching infrastructure has yet been implemented.

property embarrassingly_parallelizable

Override to return True if next() is trivially parallelizeable.

This property tells the pipeline that the problem can be parallelized trivially. This only applies to the next() method, which should not change the state of the task.

If this returns True, then the Pipeline will execute next() many times in parallel and handle all the intermediate data efficiently. Otherwise next() must be parallelized internally if at all. setup() and finish() must always be parallelized internally.

Usage of this has not implemented.

finish()[source]

Final analysis stage of pipeline task.

May be overridden with no arguments.

Any return values will be treated as pipeline data-products as specified by the out keys in the pipeline setup.

next(input=None)[source]

Iterative analysis stage of pipeline task.

May be overridden with any number of positional only arguments (defaults are allowed). Pipeline data-products will be passed as specified by in keys in the pipeline setup.

Function will be called repetitively until it either raises a PipelineStopIteration or, if accepting inputs, runs out of input data-products.

Any return values will be treated as pipeline data-products as specified by the out keys in the pipeline setup.

setup(requires=None)[source]

First analysis stage of pipeline task.

May be overridden with any number of positional only arguments (defaults are allowed). Pipeline data-products will be passed as specified by requires keys in the pipeline setup.

Any return values will be treated as pipeline data-products as specified by the out keys in the pipeline setup.

validate()[source]

Validate the task after instantiation.