caput.pipeline¶
Data Analysis and Simulation Pipeline.
A data analysis pipeline is completely specified by a YAML file that specifies both what tasks are to be run and the parameters that go to those tasks. Included in this package are base classes for simplifying the construction of data analysis tasks, as well as the pipeline manager which executes them.
Pipelines are most easily executed using the script in caput_pipeline.py,
which ships with caput.
Flow control classes¶
Task base classes¶
Examples
Basic Tasks¶
A pipeline task is a subclass of TaskBase intended to perform some small,
modular piece analysis. The developer of the task must specify what input
parameters the task expects as well as code to perform the actual processing
for the task.
Input parameters are specified by adding class attributes whose values are
instances of config.Property. For instance a task definition might begin
with
>>> class SpamTask(TaskBase):
... eggs = config.Property(proptype=str)
This defines a new task named SpamTask with a parameter named eggs, whose
type is a string. The class attribute SpamTask.eggs will replaced with an
instance attribute when an instance of the task is initialized, with it’s value
read from the pipeline configuration YAML file (see next section).
The actual work for the task is specified by over-ridding any of the
setup(), next() or
finish() methods (__init__() may also be
implemented`). These are executed in order, with next()
possibly being executed many times. Iteration of next() is halted by
raising a PipelineStopIteration. Here is a example of a somewhat
trivial but fully implemented task:
>>> class PrintEggs(TaskBase):
...
... eggs = config.Property(proptype=list)
...
... def __init__(self):
... super().__init__()
... self.i = 0
...
... def setup(self):
... print("Setting up PrintEggs.")
...
... def next(self):
... if self.i >= len(self.eggs):
... raise PipelineStopIteration()
... print("Spam and %s eggs." % self.eggs[self.i])
... self.i += 1
...
... def finish(self):
... print("Finished PrintEggs.")
Any return value of these three pipeline methods can be handled by the pipeline
and provided to subsequent tasks. The methods setup() and next()
may accept (positional only) arguments which will be received as the outputs of
early tasks in a pipeline chain. The following is an example of a pair of tasks
that are designed to operate in this manner.
>>> class GetEggs(TaskBase):
...
... eggs = config.Property(proptype=list)
...
... def __init__(self):
... super().__init__()
... self.i = 0
...
... def setup(self):
... print("Setting up GetEggs.")
...
... def next(self):
... if self.i >= len(self.eggs):
... raise PipelineStopIteration()
... egg = self.eggs[self.i]
... self.i += 1
... return egg
...
... def finish(self):
... print("Finished GetEggs.")
>>> class CookEggs(TaskBase):
...
... style = config.Property(proptype=str)
...
... def setup(self):
... print("Setting up CookEggs.")
...
... def next(self, egg):
... print("Cooking %s %s eggs." % (self.style, egg))
...
... def finish(self):
... print("Finished CookEggs.")
Note that CookEggs.next() never raises a PipelineStopIteration.
This is because there is no way for the task to internally know how long to
iterate. next() will continue to be called as long as there are inputs
for next() and will stop iterating when there are none.
Pipeline Configuration¶
To actually run a task or series of tasks, a YAML pipeline configuration is required. The pipeline configuration has two main functions: to specify the the pipeline (which tasks are run, in which order and how to handle the inputs and outputs of tasks) and to provide parameters to each individual task. Here is an example of a pipeline configuration:
>>> spam_config = '''
... pipeline :
... tasks:
... - type: PrintEggs
... params: eggs_params
...
... - type: GetEggs
... params: eggs_params
... out: egg
...
... - type: CookEggs
... params: cook_params
... in: egg
...
... eggs_params:
... eggs: ['green', 'duck', 'ostrich']
...
... cook_params:
... style: 'fried'
...
... '''
Here the ‘pipeline’ section contains parameters that pertain to the pipeline as a whole. The most important parameter is tasks, a list of tasks to be executed. Each entry in this list may contain the following keys:
- type
(required) The name of the class relative to the global name space. Any required imports will be performed dynamically. Any classes that are not importable (defined interactively) need to be registered in the dictionary
pipeline.local_tasks.- params
(required) Key or list of keys referring to sections of the pipeline configuration holding parameters for the task.
- out
A ‘pipeline product key’ or list of keys that label any return values from
setup(),next()orfinish().- requires
A ‘pipeline product key’ or list of keys representing values to be passed as arguments to
setup().- in
A ‘pipeline product key’ or list of keys representing values to be passed as arguments to
next().
The sections other than ‘pipeline’ in the configuration contain the parameter for the various tasks, as specified be the ‘params’ keys.
Execution Order¶
When the above pipeline is executed is produces the following output.
>>> local_tasks.update(globals()) # Required for interactive sessions.
>>> Manager.from_yaml_str(spam_config).run()
Setting up PrintEggs.
Setting up GetEggs.
Setting up CookEggs.
Spam and green eggs.
Cooking fried green eggs.
Spam and duck eggs.
Cooking fried duck eggs.
Spam and ostrich eggs.
Cooking fried ostrich eggs.
Finished PrintEggs.
Finished GetEggs.
Finished CookEggs.
The rules for execution order are as follows:
One of the methods setup(), next() or finish(), as appropriate, will be executed from each task, in order.
If the task method is missing its input, as specified by the ‘requires’ or ‘in’ keys, restart at the beginning of the tasks list.
If the input to next() is missing and the task is at the beginning of the list there will be no opportunity to generate this input. Stop iterating next() and proceed to finish().
Once a task has executed finish(), remove it from the list.
Once a method from the last member of the tasks list is executed, restart at the beginning of the list.
If the above rules seem somewhat opaque, consider the following example which illustrates these rules in a pipeline with a slightly more non-trivial flow.
>>> class DoNothing(TaskBase):
...
... def setup(self):
... print("Setting up DoNothing.")
...
... def next(self, input):
... print("DoNothing next.")
...
... def finish(self):
... print("Finished DoNothing.")
>>> local_tasks.update(globals()) # Required for interactive sessions only.
>>> new_spam_config = '''
... pipeline :
... tasks:
... - type: GetEggs
... params: eggs_params
... out: egg
...
... - type: CookEggs
... params: cook_params
... in: egg
...
... - type: DoNothing
... params: no_params
... in: non_existent_data_product
...
... - type: PrintEggs
... params: eggs_params
...
... eggs_params:
... eggs: ['green', 'duck', 'ostrich']
...
... cook_params:
... style: 'fried'
...
... no_params: {}
... '''
The following would error, because the pipeline config is checked for errors, like an ‘in’ parameter without a corresponding ‘out’:
Manager.from_yaml_str(new_spam_config).run()
But this is what it would produce otherwise:
Setting up GetEggs.
Setting up CookEggs.
Setting up DoNothing.
Setting up PrintEggs.
Cooking fried green eggs.
Cooking fried duck eggs.
Cooking fried ostrich eggs.
Finished GetEggs.
Finished CookEggs.
Finished DoNothing.
Spam and green eggs.
Spam and duck eggs.
Spam and ostrich eggs.
Finished PrintEggs.
Notice that DoNothing.next() is never called, since the pipeline never
generates its input, ‘non_existent_data_product’. Once everything before
DoNothing has been executed the pipeline notices that there is no
opertunity for ‘non_existent_data_product’ to be generated and forces
DoNothing to proceed to finish(). This also unblocks PrintEggs
allowing it to proceed normally.
Pure Python Pipelines¶
It is possible to construct and run a pipeline purely within Python, which can be useful for quick prototyping and debugging. This gives direct control over task construction and configuration, and allows injection and inspection of pipeline products.
To add a task to the pipeline you need to: create an instance of it; set any
configuration attributes directly (or call read_config() on an
appropriate dictionary); and then added to the pipeline using the
add_task() to add the instance and specify the queues it connects to.
To inject products into the pipeline, use the Input and supply it an
iterator as an argument. Each item will be fed into the pipeline one by one. To take
outputs from the pipeline, simply use the Output task. By default this
simply saves everything it receives into a list, but it can be given a callback
function to apply processing to each argument in turn.
>>> m = Manager()
>>> m.add_task(Input(["platypus", "dinosaur"]), out="key1")
>>> cook = CookEggs()
>>> cook.style = "coddled"
>>> m.add_task(cook, in_="key1")
>>> m.add_task(Output(lambda x: print("I love %s eggs!" % x)), in_="key1")
>>> m.run()
Setting up CookEggs.
Cooking coddled platypus eggs.
I love platypus eggs!
Cooking coddled dinosaur eggs.
I love dinosaur eggs!
Finished CookEggs.
Advanced Tasks¶
Several subclasses of TaskBase provide advanced functionality for tasks that
conform to the most common patterns. This functionality includes: optionally
reading inputs from disk, instead of receiving them from the pipeline;
optionally writing outputs to disk automatically; and caching the results of a
large computation to disk in an intelligent manner (not yet implemented).
Base classes providing this functionality are SingleBase for ‘one
shot’ tasks and IterBase for task that need to iterate. There are
limited to a single input (‘in’ key) and a single output (‘out’ key). Method
process() should be overwritten instead of next().
Optionally, read_input() and write_output()
may be over-ridden for maximum functionality. setup() and finish()
may be overridden as usual.
In addition SingleH5Base, IterH5Base, provide the
read_input() and write_output() methods for the most common
formats.
See the documentation for these base classes for more details.
- class caput.pipeline.BasicContMixin[source]¶
Bases:
objectProvides IO for BasicCont objects in pipeline tasks.
As a mixin, this must be combined (using multiple inheritance) with a subclass of TaskBase, providing the full task API.
Provides the methods read_input, read_output and write_output for BasicCont data which gets written to HDF5 files.
- class caput.pipeline.H5IOMixin[source]¶
Bases:
objectProvides hdf5/zarr IO for pipeline tasks.
As a mixin, this must be combined (using multiple inheritance) with a subclass of TaskBase, providing the full task API.
Provides the methods read_input, read_output and write_output for hdf5 data.
- static write_output(filename, output, file_format=None, **kwargs)[source]¶
Method for writing hdf5/zarr output.
- Parameters
filename (str) – File name
output (memh5.Group, zarr.Group or h5py.Group) – output to be written. If this is a h5py.Group (which include hdf5.File objects) the buffer is flushed if filename points to the same file and a copy is made otherwise.
file_format (fileformats.Zarr, fileformats.HDF5 or None) – File format to use. If this is not specified, the file format is guessed based on the type of output or the filename. If guessing is not successful, HDF5 is used.
- class caput.pipeline.Input(inputs=None)[source]¶
Bases:
caput.pipeline.TaskBasePass inputs into the pipeline from outside.
Initialize pipeline task.
May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.
- class caput.pipeline.IterBase[source]¶
Bases:
caput.pipeline._OneAndOneBase class for iterating tasks with at most one input and one output.
Tasks inheriting from this class should override
process()and optionallysetup(),finish(),read_input(),write_output()andcast_input(). They should not overridenext().If the value of
input_rootis anything other than the string “None” then the input will be read (usingread_input()) from the fileself.input_root + self.file_middles[i] + self.input_ext. If the input is specified both as a filename and as a product key in the pipeline configuration, an error will be raised upon initialization.If the value of
output_rootis anything other than the string “None” then the output will be written (usingwrite_output()) to the fileself.output_root + self.file_middles[i] + self.output_ext.- iteration¶
The current iteration of process/next.
- Type
int
- file_middles¶
The unique part of each file path.
- Type
list of strings
- input_root¶
Pipeline settable parameter giving the first part of the input path. If set to ‘None’ no input is read. Either it is assumed that no input is required or that input is recieved from the pipeline.
- Type
string
- input_ext¶
Pipeline settable parameter giving the last part of input path. The full input path is
self.input_root + self.file_middles[self.iteration] + self.input_ext.- Type
string
- output_root¶
Pipeline settable parameter giving the first part of the output path. If set to ‘None’ no output is written.
- Type
strig
- output_ext¶
Pipeline settable parameter giving the last part of output path. The full output path is
self.output_root + self.file_middles[self.iteration] + self.output_ext.- Type
string
Initialize pipeline task.
May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.
- class caput.pipeline.IterH5Base[source]¶
Bases:
caput.pipeline.H5IOMixin,caput.pipeline.IterBaseBase class for iterating over hdf5 input and output.
- class caput.pipeline.Manager(psutil_profiling=False)[source]¶
Bases:
caput.config.ReaderPipeline manager for setting up and running pipeline tasks.
The manager is in charge of initializing all pipeline tasks, setting them up by providing the appropriate parameters, then executing the methods of the each task in the appropriate order. It also handles intermediate data products and ensuring that the correct products are passed between tasks.
- logging¶
Log levels per module. The key “root” stores the root log level.
- Type
Dict(str, str)
- multiprocessing¶
TODO
- Type
int
- cluster¶
TODO
- Type
dict
- tasks¶
Configuration of pipeline tasks.
- Type
list
- save_versions¶
Module names (str). This list together with the version strings from these modules are attached to output metadata. Default: [].
- Type
list
- save_config¶
If this is True, the global pipeline configuration is attached to output metadata. Default: True.
- Type
bool
- psutil_profiling¶
Use psutil to profile CPU and memory usage. Default False.
- Type
bool
- add_task(task, requires=None, in_=None, out=None)[source]¶
Add a task instance to the pipeline.
- Parameters
task (TaskBase) – A pipeline task instance.
requires (list or string) – The names of the task inputs and outputs.
in (list or string) – The names of the task inputs and outputs.
out (list or string) – The names of the task inputs and outputs.
- Raises
caput.config.CaputConfigError – If there was an error in the task configuration.
- classmethod from_yaml_file(file_name, lint=False, psutil_profiling=False)[source]¶
Initialize the pipeline from a YAML configuration file.
- Parameters
file_name (string) – Path to YAML pipeline configuration file.
lint (bool) – Instantiate Manager only to lint config. Disables debug logging.
psutil_profiling (bool) – Use psutil to profile CPU and memory usage
- Returns
self
- Return type
Pipeline object
- classmethod from_yaml_str(yaml_doc, lint=False, psutil_profiling=False)[source]¶
Initialize the pipeline from a YAML configuration string.
- Parameters
yaml_doc (string) – Yaml configuration document.
lint (bool) – Instantiate Manager only to lint config. Disables debug logging.
psutil_profiling (bool) – Use psutil to profile CPU and memory usage.
- Returns
self
- Return type
Pipeline object
- run()[source]¶
Main driver method for the pipeline.
This function initializes all pipeline tasks and runs the pipeline through to completion.
- Raises
PipelineRuntimeError – If a task stage returns the wrong number of outputs.
- class caput.pipeline.Output(callback=None)[source]¶
Bases:
caput.pipeline.TaskBaseTake outputs from the pipeline and place them in a list.
To apply some processing to pipeline output (i.e. this tasks input), use the callback argument which will get passed the item. The return value of the callback is placed in the outputs attribute. Note that this need not be the input, so if pipeline output should be deleted to save memory you can simply return None.
- Parameters
callback (function, optional) – A function which can apply some processing to the pipeline output.
Initialize pipeline task.
May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.
- exception caput.pipeline.PipelineConfigError(message)[source]¶
Bases:
caput.config.CaputConfigErrorDeprecated. Raised when there is an error setting up a pipeline.
- exception caput.pipeline.PipelineRuntimeError[source]¶
Bases:
ExceptionRaised when there is a pipeline related error at runtime.
- exception caput.pipeline.PipelineStopIteration[source]¶
Bases:
ExceptionThis stops the iteration of next() in pipeline tasks.
Pipeline tasks should raise this excetions in the next() method to stop the iteration of the task and to proceed to finish().
Note that if next() recieves input data as an argument, it is not required to ever raise this exception. The pipeline will proceed to finish() once the input data has run out.
- class caput.pipeline.SingleBase[source]¶
Bases:
caput.pipeline._OneAndOneBase class for non-iterating tasks with at most one input and output.
Inherits from
TaskBase.Tasks inheriting from this class should override process and optionally
setup(),finish(),read_input(),write_output()andcast_input(). They should not overridenext().If the value of
input_rootis anything other than the string “None” then the input will be read (usingread_input()) from the fileself.input_root + self.input_filename. If the input is specified both as a filename and as a product key in the pipeline configuration, an error will be raised upon initialization.If the value of
output_rootis anything other than the string “None” then the output will be written (usingwrite_output()) to the fileself.output_root + self.output_filename.- input_root¶
Pipeline settable parameter giving the first part of the input path. If set to ‘None’ no input is read. Either it is assumed that no input is required or that input is recieved from the pipeline.
- Type
string
- input_filename¶
Pipeline settable parameter giving the last part of input path. The full input path is
self.input_root + self.input_filename.- Type
string
- output_root¶
Pipeline settable parameter giving the first part of the output path. If set to ‘None’ no output is written.
- Type
strig
- output_filename¶
Pipeline settable parameter giving the last part of output path. The full output path is
self.output_root + self.output_filename.- Type
string
Initialize pipeline task.
May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.
- class caput.pipeline.SingleH5Base[source]¶
Bases:
caput.pipeline.H5IOMixin,caput.pipeline.SingleBaseBase class for tasks with hdf5 input and output.
Inherits from
H5IOMixinandSingleBase.
- class caput.pipeline.TaskBase[source]¶
Bases:
caput.config.ReaderBase class for all pipeline tasks.
All pipeline tasks should inherit from this class, with functionality and analysis added by over-riding __init__, setup, next and/or finish.
In addition, input parameters may be specified by adding class attributes which are instances of config.Property. These will then be read from the pipeline yaml file when the pipeline is initialized. The class attributes will be overridden with instance attributes with the same name but with the values specified in the pipeline file.
Initialize pipeline task.
May be overridden with no arguments. Will be called after any config.Property attributes are set and after ‘input’ and ‘requires’ keys are set up.
- property cacheable¶
Override to return True if caching results is implemented.
No caching infrastructure has yet been implemented.
- property embarrassingly_parallelizable¶
Override to return True if next() is trivially parallelizeable.
This property tells the pipeline that the problem can be parallelized trivially. This only applies to the next() method, which should not change the state of the task.
If this returns True, then the Pipeline will execute next() many times in parallel and handle all the intermediate data efficiently. Otherwise next() must be parallelized internally if at all. setup() and finish() must always be parallelized internally.
Usage of this has not implemented.
- finish()[source]¶
Final analysis stage of pipeline task.
May be overridden with no arguments.
Any return values will be treated as pipeline data-products as specified by the out keys in the pipeline setup.
- next(input=None)[source]¶
Iterative analysis stage of pipeline task.
May be overridden with any number of positional only arguments (defaults are allowed). Pipeline data-products will be passed as specified by in keys in the pipeline setup.
Function will be called repetitively until it either raises a PipelineStopIteration or, if accepting inputs, runs out of input data-products.
Any return values will be treated as pipeline data-products as specified by the out keys in the pipeline setup.
- setup(requires=None)[source]¶
First analysis stage of pipeline task.
May be overridden with any number of positional only arguments (defaults are allowed). Pipeline data-products will be passed as specified by requires keys in the pipeline setup.
Any return values will be treated as pipeline data-products as specified by the out keys in the pipeline setup.