Data Analysis and Simulation Pipeline.
A data analysis pipeline is completely specified by a YAML file that specifies both what tasks are to be run and the parameters that go to those tasks. Included in this package are base classes for simplifying the construction of data analysis tasks, as well as the pipeline manager which executes them.
Pipelines are most easily executed using the script in ``caput_pipeline.py`, which ships with caput.
| Manager | Pipeline manager for setting up and running pipeline tasks. |
| PipelineConfigError | Raised when there is an error setting up a pipeline. |
| PipelineRuntimeError | Raised when there is a pipeline related error at runtime. |
| PipelineStopIteration | This stops the iteration of next() in pipeline tasks. |
| TaskBase() | Base class for all pipeline tasks. |
| SingleBase() | Base class for non-iterating tasks with at most one input and output. |
| IterBase() | Base class for iterating tasks with at most one input and one output. |
| H5IOMixin | Provides hdf5 IO for pipeline tasks. |
| BasicContMixin | Provides IO for BasicCont objects in pipeline tasks. |
| SingleH5Base() | Base class for tasks with hdf5 input and output. |
| IterH5Base() | Base class for iterating over hdf5 input and output. |
A pipeline task is a subclass of TaskBase intended to perform some small, modular piece analysis. The developer of the task must specify what input parameters the task expects as well as code to perform the actual processing for the task.
Input parameters are specified by adding class attributes whose values are instances of config.Property. For instance a task definition might begin with
>>> class SpamTask(TaskBase):
... eggs = config.Property(proptype=str)
This defines a new task named SpamTask with a parameter named eggs, whose type is a string. The class attribute SpamTask.eggs will replaced with an instance attribute when an instance of the task is initialized, with it’s value read from the pipeline configuration YAML file (see next section).
The actual work for the task is specified by over-ridding any of the setup(), next() or finish() methods (__init__() may also be implemented`). These are executed in order, with next() possibly being executed many times. Iteration of next() is halted by raising a PipelineStopIteration. Here is a example of a somewhat trivial but fully implemented task:
>>> class PrintEggs(TaskBase):
...
... eggs = config.Property(proptype=list)
...
... def __init__(self):
... self.i = 0
...
... def setup(self):
... print "Setting up PrintEggs."
...
... def next(self):
... if self.i >= len(self.eggs):
... raise PipelineStopIteration()
... print "Spam and %s eggs." % self.eggs[self.i]
... self.i += 1
...
... def finish(self):
... print "Finished PrintEggs."
Any return value of these three pipeline methods can be handled by the pipeline and provided to subsequent tasks. The methods setup() and next() may accept (positional only) arguments which will be received as the outputs of early tasks in a pipeline chain. The following is an example of a pair of tasks that are designed to operate in this manner.
>>> class GetEggs(TaskBase):
...
... eggs = config.Property(proptype=list)
...
... def __init__(self):
... self.i = 0
...
... def setup(self):
... print "Setting up GetEggs."
...
... def next(self):
... if self.i >= len(self.eggs):
... raise PipelineStopIteration()
... egg = self.eggs[self.i]
... self.i += 1
... return egg
...
... def finish(self):
... print "Finished GetEggs."
>>> class CookEggs(TaskBase):
...
... style = config.Property(proptype=str)
...
... def setup(self):
... print "Setting up CookEggs."
...
... def next(self, egg):
... print "Cooking %s %s eggs." % (self.style, egg)
...
... def finish(self):
... print "Finished CookEggs."
Note that CookEggs.next() never raises a PipelineStopIteration. This is because there is no way for the task to internally know how long to iterate. next() will continue to be called as long as there are inputs for next() and will stop iterating when there are none.
To actually run a task or series of tasks, a YAML pipeline configuration is required. The pipeline configuration has two main functions: to specify the the pipeline (which tasks are run, in which order and how to handle the inputs and outputs of tasks) and to provide parameters to each individual task. Here is an example of a pipeline configuration:
>>> spam_config = '''
... pipeline :
... tasks:
... - type: PrintEggs
... params: eggs_params
...
... - type: GetEggs
... params: eggs_params
... out: egg
...
... - type: CookEggs
... params: cook_params
... in: egg
...
... eggs_params:
... eggs: ['green', 'duck', 'ostrich']
...
... cook_params:
... style: 'fried'
...
... '''
Here the ‘pipeline’ section contains parameters that pertain to the pipeline as a whole. The most important parameter is tasks, a list of tasks to be executed. Each entry in this list may contain the following keys:
The sections other than ‘pipeline’ in the configuration contain the parameter for the various tasks, as specified be the ‘params’ keys.
When the above pipeline is executed is produces the following output.
>>> local_tasks.update(globals()) # Required for interactive sessions.
>>> Manager.from_yaml_str(spam_config).run()
Setting up PrintEggs.
Setting up GetEggs.
Setting up CookEggs.
Spam and green eggs.
Cooking fried green eggs.
Spam and duck eggs.
Cooking fried duck eggs.
Spam and ostrich eggs.
Cooking fried ostrich eggs.
Finished PrintEggs.
Finished GetEggs.
Finished CookEggs.
The rules for execution order are as follows:
If the above rules seem somewhat opaque, consider the following example which illustrates these rules in a pipeline with a slightly more non-trivial flow.
>>> class DoNothing(TaskBase):
...
... def setup(self):
... print "Setting up DoNothing."
...
... def next(self, input):
... print "DoNothing next."
...
... def finish(self):
... print "Finished DoNothing."
>>> local_tasks.update(globals()) # Required for interactive sessions only.
>>> new_spam_config = '''
... pipeline :
... tasks:
... - type: GetEggs
... params: eggs_params
... out: egg
...
... - type: CookEggs
... params: cook_params
... in: egg
...
... - type: DoNothing
... params: no_params
... in: non_existent_data_product
...
... - type: PrintEggs
... params: eggs_params
...
... eggs_params:
... eggs: ['green', 'duck', 'ostrich']
...
... cook_params:
... style: 'fried'
...
... no_params: {}
... '''
>>> Manager.from_yaml_str(new_spam_config).run()
Setting up GetEggs.
Setting up CookEggs.
Setting up DoNothing.
Setting up PrintEggs.
Cooking fried green eggs.
Cooking fried duck eggs.
Cooking fried ostrich eggs.
Finished GetEggs.
Finished CookEggs.
Finished DoNothing.
Spam and green eggs.
Spam and duck eggs.
Spam and ostrich eggs.
Finished PrintEggs.
Notice that DoNothing.next() is nerver called, since the pipeline never generates its input, ‘non_existent_data_product’. Once everything before DoNothing has been executed the pipeline notices that there is no opertunity for ‘non_existent_data_product’ to be generated and forces DoNothing to proceed to finish(). This also unblocks PrintEggs allowing it to proceed normally.
Several subclasses of TaskBase provide advanced functionality for tasks that conform to the most common patterns. This functionality includes: optionally reading inputs from disk, instead of receiving them from the pipeline; optionally writing outputs to disk automatically; and caching the results of a large computation to disk in an intelligent manner (not yet implemented).
Base classes providing this functionality are SingleBase for ‘one shot’ tasks and IterBase for task that need to iterate. There are limited to a single input (‘in’ key) and a single output (‘out’ key). Method process() should be overwritten instead of next(). Optionally, read_input() and write_output() may be over-ridden for maximum functionality. setup() and finish() may be overridden as usual.
In addition SingleH5Base, IterH5Base, provide the read_input() and write_output() methods for the most common formats.
See the documentation for these base classes for more details.