Piper - Flexible, iterable pipeline engine with automatic batching
use Piper; my $pipeline = Piper->new( first_process => sub { my ($instance, $batch) = @_; $instance->emit( map { ... } @$batch ); }, second_processes => Piper->new(...), final_process => sub { ... }, )->init; $pipeline->enqueue(@data); while ($pipeline->isnt_exhausted) { my $item = $pipeline->dequeue; ... }
The software engineering concept known as a pipeline is a chain of processing segments, arranged such that the output of each segment is the input of the next.
Piper is a pipeline builder. It composes arbitrary processing segments into a single pipeline instance with the following features:
Pipeline instances are iterators, only processing data as needed.
Data is automatically processed in batches for each segment (with configurable batch sizes).
Built-in support exists for non-linear and/or recursive pipelines.
Processing segments are pluggable and reusable.
Create a container pipeline segment (parent) from the provided child @segments.
@segments
Additionally, a single hashref of attributes for the container/parent segment may be included as an argument to the constructor (anywhere in the argument list). See the "SEGMENT ATTRIBUTES" section for a description of attributes available for both parent and child segments.
Accepted segment types are as follows:
Creates a sub-container of pipeline segments. There is no (explicit) limit to the number of nested containers a pipeline may contain.
See the "PROCESS HANDLER" section for a description of Piper::Process objects.
In order to be considered a candidate for coercion, the hashref must contain (at a minimum) the 'handler' key.
In this case, the associated Piper or Piper::Process object is extracted from the Piper::Instance object for use in the new pipeline segment.
See "INITIALIZATION" for a description of Piper::Instance objects.
$label => $segment
For such pairs, the $segment can be any of the above segment types, and $label is a simple scalar which will be used as $segment's label.
$segment
$label
If the $segment already has a label, $label will override it.
my $pipe = Piper->new( \%main_opts, subpipe_label => Piper->new( first_handler => Piper::Process->new(sub { ... }), second_handler => sub { ... }, third_handler => { handler => sub { ... }, }, another_subpipe => Piper->new(...), \%subpipe_opts, ), Piper::Process->new({ label => 'another_handler', handler => sub { ... }, }), sub { # An un-labeled handler ... }, { label => 'final_handler', handler => sub { ... }, }, );
Piper segments were designed to be easily reusable. Prior to initialization, Piper and Piper::Process objects do not process data; they simply contain the blueprint for creating the pipeline. As such, blueprints for commonly-used pipeline segments can be stored in package libraries and imported wherever needed.
To create a functioning pipeline from one such blueprint, simply call the init method on the outermost segment. The init method returns a Piper::Instance object of the outermost segment, which is the realization of the pipeline design, and which contains Piper::Instance objects created from all its contained segments.
init
Initialization fuses the pipeline segments together, establishes the relationships between the segments, and initializes the dataflow infrastructure.
The init method may be chained from the constructor if the blueprint object is not needed:
my $instance = Piper->new(...)->init;
Any arguments passed to the init method will be cached and made available to each handler in the pipeline (see the "PROCESS HANDLER" section for full description of handlers). This is a great way to share a resource (such as a database handle) among process handlers.
my $pipe = Piper->new( query => sub { my ($instance, $batch, $dbh) = @_; $instance->emit( $dbh->do_query(@$batch) ); }, ... ); my $instance = $pipe->init($dbh);
Instances are ready to accept data for processing:
$instance->enqueue(@data); while ($instance->isnt_exhausted) { my $result = $instance->dequeue; }
Piper::Process objects have the same "SEGMENT ATTRIBUTES" as Piper objects, but have an additional required attribute known as its handler.
handler
A process handler is the data-processing subroutine for the segment.
In its simplest form, the process handler takes input from the previous pipeline segment, processes it, and passes it on to the next segment; but handlers also have built-in support for non-linear and recursive dataflow (see "FLOW CONTROL").
The arguments provided to the handler subroutine are:
$instance
The instance (a Piper::Instance object) corresponding to the segment.
$batch
An arrayref of data items to process.
@args
Any arguments provided to the init method during the "INITIALIZATION" of the pipeline.
After processing a batch of data, the handler may pass the results to the next segment using the emit method called from the handler's $instance.
emit
sub { my ($instance, $batch) = @_; $instance->emit( map { ... } @$batch ); }
Since Piper has built-in support for non-linear and/or recursive pipelines, a "PROCESS HANDLER" may send data to any other segment in the pipeline, including itself.
The following methods may be called from the $instance object passed as the first argument to a handler:
Send @data to the next segment in the pipeline. If the instance is the last in the pipeline, emits to the drain, making the @data ready for dequeue.
@data
dequeue
Re-queue @data to the top of the current segment in an order such that dequeue(1) would subsequently return $data[0] and so forth.
dequeue(1)
$data[0]
Send @data to the segment at or after the specified $location.
$location
For each of the above methods, $location must be the label of a segment in the pipeline or a path-like representation of an hierarchy of labels.
For example, in the following pipeline, a few possible $location values include a, subpipe/b, or main/subpipe/c.
a
subpipe/b
main/subpipe/c
my $pipe = Piper->new( { label => 'main' }, subpipe => Piper->new( a => sub { ... }, b => sub { ... }, c => sub { ... }, ), );
If a label is unique within the pipeline, only the label is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.
For example, in the following pipeline, searching for processA from the handler of processB would find main/pipeA/processA, not main/processA. So to reach main/processA from processB, the handler would need to search for main/processA.
processA
processB
main/pipeA/processA
main/processA
my $pipe = Piper->new( { label => 'main' }, pipeA => Piper->new( processA => sub { ... }, processB => sub { ... }, ), processA => sub { ... }, );
If the segment has a parent, enqueues @data to its parent. Otherwise, enqueues @data to itself.
If the segment has a parent, send @data to the drain of its parent. Otherwise, enqueues @data to the segment's drain.
All of the following attributes are available for both container (Piper) and processor (Piper::Process) segment types.
Each attribute is equipped with an accessor of the same name.
A star (*) indicates that the attribute is writable, and can be modified at runtime by passing a value as an argument to the method of the same name.
All attributes (except label) have an associated predicate method called has_$attribute which returns a boolean indicating whether the attribute has been set for the segment.
label
has_$attribute
All writable attributes (indicated by *) can be cleared by passing an explicit undef to the writer method or by calling the appropriate clearer method called clear_$attribute.
undef
clear_$attribute
All accessors, writers, predicates, and clearers are available for each segment before and after "INITIALIZATION".
A coderef which can be used to subset the items which are allowed to be processed by the segment.
The coderef executes on each item attempting to queue to the segment. If it returns true, the item is queued. Otherwise, the item skips the segment and proceeds to the next adjacent segment.
Each item is localized to $_, and is also passed in as the first argument.
$_
These example allow subroutines are equivalent:
allow
# This segment only accepts digit inputs allow => sub { /^\d+$/ } allow => sub { $_ =~ /^\d+$/ } allow => sub { $_[0] =~ /^\d+$/ }
The number of items to process at a time for the segment.
Once initialized (see "INITIALIZATION"), a segment inherits the batch_size of any existing parent(s) if not provided. If the segment has no parents, or if none of the parents have a batch_size defined, the default batch_size will be used. The default batch_size is 200, but can be configured in the import statement (see the "GLOBAL CONFIGURATION" section).
batch_size
The debug level for the segment.
Once initialized (see "INITIALIZATION"), a segment inherits the debug level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable PIPER_DEBUG.
PIPER_DEBUG
See the "LOGGING AND DEBUGGING" section for specifics about debug and verbosity levels.
A boolean indicating that the segment is enabled and can accept items for processing.
Once initialized (see "INITIALIZATION"), a segment inherits this attribute from any existing parent(s). The default is true.
If a segment is disabled (enabled = 0), all items attempting to queue to the segment are forwarded to the next adjacent segment.
enabled = 0
A label for the segment. If no label is provided, a globally unique ID will be used.
Labels are necessary for certain types of "FLOW CONTROL" (for example, injectAt or injectAfter). For pipelines that do not utilize "FLOW CONTROL" features, labels are primarily useful for "LOGGING AND DEBUGGING".
The verbosity level for the segment.
Once initialized (see "INITIALIZATION"), a segment inherits the verbosity level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable PIPER_VERBOSE.
PIPER_VERBOSE
The following attributes have read-only accessors (of the same name).
For container instances (made from Piper objects, not Piper::Process objects), holds an arrayref of the contained instance objects.
For any instance in the pipeline, this attribute holds a reference to the outermost container instance.
For all instances in the pipeline except the outermost container (main), this attribute holds a reference to the instance's immediate container segment.
main
The full path to the instance, built as the concatenation of all the parent(s) labels and the instance's label, joined by /. Instances stringify to this attribute.
/
Methods marked with a (*) should only be called from the outermost instance.
Remove at most $num (default 1) processed items from the end of the pipeline.
$num
Queue @data for processing by the pipeline.
Find and return the segment instance according to $location, which can be a label or a path-like hierarchy of labels. See injectAfter for a detailed description of $location.
Process batches until there are no more items pending.
A boolean indicating whether the instance has any children.
A boolean indicating whether the instance has a parent.
Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.
Returns a boolean indicating whether there are any items left to process or dequeue.
Returns the opposite of is_exhausted.
is_exhausted
Returns the next adjacent segment from the calling segment. Returns undef for the outermost container.
Returns the number of items that are queued at some level of the pipeline segment but have not completed processing.
Process batches while data is still pending until at least $num (default 1) items are ready for dequeue.
pending
ready
Returns the number of items that have finished processing and are ready for dequeue from the pipeline segment.
The following global attributes are configurable from the Piper import statement.
Ex: # Change the default batch_size to 50 use Piper batch_size => 50;
The default batch size used by pipeline segments which do not have a locally defined batch_size and do not have a parent segment with a defined batch_size.
The batch_size attribute must be a positive integer.
The default batch_size is 200.
Logging and debugging facilities are available upon "INITIALIZATION" of a pipeline.
Warnings and errors are issued regardless of debug and verbosity levels via carp and croak from the Carp module, and are therefore configurable with any of Carp's global options or environment variables.
carp
croak
Debugging and/or informational messages are printed to STDERR if debug and/verbosity levels have been set. There are three levels used by Piper for each of debug/verbose: 0, 1, or 2. The default is 0 (off).
debug
verbose
Levels can be set by any of the following mechanisms: at construction of the Piper/Piper::Process objects, dynamically via the debug and verbose methods of segments, or with the environment variables PIPER_DEBUG and PIPER_VERBOSE.
Levels can be set local to specific segments. The default levels of a sub-segment are inherited from its parent.
Ex: # main verbose => 0 (default) # main/subpipe verbose => 1 # main/subpipe/normal verbose => 1 (inherited) # main/subpipe/loud verbose => 2 # main/subpipe/quiet verbose => 0 my $pipe = Piper->new( { label => 'main' }, subpipe => Piper->new( { verbose => 1 }, normal => sub {...}, loud => { verbose => 2, handler => sub {...}, }, quiet => { verbose => 0, handler => sub {...}, }, ), );
Levels set via the environment variables PIPER_DEBUG and PIPER_VERBOSE are global. If set, these environment variables override any and all settings defined in the source code.
All messages include information about the segment which called the logger.
Existing informational (verbose or debug > 0) messages describe data processing steps, such as noting when items are queueing or being processed by specific segments. Increasing level(s) 1> simply adds more detail to the printed messages.
Existing debug messages describe the decision actions of the pipeline engine itself. Examples include logging its search steps when locating a named segment or explaining how it chooses which batch to process. Increasing the debug level > 1 simply adds more detail to the printed messages.
User-defined errors, warnings, and debug or informational messages can use the same logging system as Piper itself.
The first argument passed to a "PROCESS HANDLER" is the Piper::Instance object associated with that segment, which has the below-described methods available for logging, debugging, warning, or throwing errors.
In each of the below methods, the @items are optional and only printed if the verbosity level for the segment is > 1. They can be used to pass additional context or detail about the data being processed or which caused the message to print (for conditional messages).
@items
The built-in messaging only uses debug/verbosity levels 1 and 2, but there are no explicit rules enforced on maximum debug/verbosity levels, so users may explicitly require higher levels for custom messages to heighten the required levels for any custom message.
Throws an error with $message via croak.
$message
Issues a warning with $message via carp.
Prints an informational $message to STDERR if either the debug or verbosity level for the segment is > 0.
Prints a debug $message to STDERR if the debug level for the segment is > 0.
my $pipe = Piper->new( messenger => sub { my ($instance, $batch) = @_; for my $data (@$batch) { if ($data->is_bad) { $instance->ERROR("Data <$data> is bad!"); } } # User-heightened verbosity level $instance->INFO('Data all good!', @$batch) if $instance->verbose > 2; ... }, ... );
Much of the concept and API for this project was inspired by the work of Nathaniel Pierce.
Special thanks to Tim Heaney for his encouragement and mentorship.
version 0.04
Mary Ehlers <ehlers@cpan.org>
Tim Heaney <oylenshpeegul@gmail.com>
This software is Copyright (c) 2017 by Mary Ehlers.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004
To install Piper, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Piper
CPAN shell
perl -MCPAN -e shell install Piper
For more information on module installation, please visit the detailed CPAN module installation guide.