The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#===============================================================================
#
#  DESCRIPTION:  Join flows into one
#
#       AUTHOR:  Aliaksandr P. Zahatski, <zahatski@gmail.com>
#===============================================================================
=head1 NAME

Flow::Join - Assemble multiple flows streams in to one


=head1 SYNOPSIS

    my ( $s, $s1 );
    my $f1 = Flow::create_flow(
        Splice => 200,
        Join   => {
            Data => Flow::create_flow(
                sub {
                    return [ grep { $_ > 10 } @_ ];
                },
                Splice => 10

            ),
            Min => Flow::create_flow(
                sub {
                    return [ grep { $_ == 1 } @_ ];
                },
                Splice => 40,
            )
        },
        ToXML  => \$s,
    );
    $f1->run( 1, 3, 11 );


    Join   => [
            Data => Flow::create_flow(
                sub {
                    return [ grep { $_ > 10 } @_ ];
                },
                Splice => 10

            ),
            Min => Flow::create_flow(
                sub {
                    return [ grep { $_ == 1 } @_ ];
                },
                Splice => 40,
            )
        ],

=head1 DESCRIPTION

Flow::Join - Assemble multiple flows streams in to one

=cut

package Flow::NamedPipesPack;
use strict;
use warnings;
use base 'Flow';
our $VERSION = '0.1';


sub begin {
    return;
}

sub flow {
    my $self = shift;
    $self->put_ctl_flow(
        { type => 'named_pipes', name => $self->{name}, stage => 1 } );
    my $res = $self->SUPER::flow(@_);
    $self->put_ctl_flow(
        { type => 'named_pipes', name => $self->{name}, stage => 2 } );
    return $res;
}

sub ctl_flow {
    my $self = shift;
    $self->put_ctl_flow(
        { type => 'named_pipes', name => $self->{name}, stage => 3 } );
    my $res = $self->SUPER::ctl_flow(@_);
    $self->put_ctl_flow(
        { type => 'named_pipes', name => $self->{name}, stage => 4 } );
    return $res;
}

sub end {
    my $self = shift;
    return;
}
1;

package Flow::Join;
use strict;
use warnings;
use Data::Dumper;
use Flow;
use base 'Flow';
our $VERSION = '0.1';

sub new {
    my $self  = shift->SUPER::new();
    my @order = ();
    # Flow::Join:: { Data=>$dsd, Test=>$sdsd}
    if ($#_ == 0 ) {
        my $args = shift @_;
#        @_ = ref( $args) eq 'ARRAY' ?  @{ $args } : %{ $args} ;
        if (ref( $args) eq 'ARRAY') {
            @_ = @{ $args };
        } else {
            @_ = ();
            foreach my $k ( reverse sort keys %{ $args} ) {
                push @_, $k, $args->{$k}
            }

        }

    }
    while ( my ( $name, $value ) = splice @_, 0, 2 ) {
        push @order,
          {
            name   => $name,
            flow   => $value,
            origin => [
                Flow::split_flow($value),
                new Flow::NamedPipesPack:: name => $name
            ]
          };
    }
    $self->{flows} = \@order;
    $self;
}

sub set_handler {
    my $self = shift;
    $self->SUPER::set_handler(@_);
    my $handler = $self->get_handler;
    foreach ( @{ $self->{flows} } ) {
        my @flows = reverse( @{ $_->{origin} }, $handler );
        my $next = shift @flows;
        while ( my $h = shift @flows ) {
            $h->set_handler($next);
            $next = $h;
        }
        $_->{flow} = $next;
    }
}

sub _get_flows {
    my $self = shift;
    my @res  = ();
    foreach my $rec ( @{ $self->{flows} } ) {
        push @res, $rec->{flow};
    }
    @res;
}

sub flow() {
    my $self = shift;
    foreach my $f ( $self->_get_flows ) {
        $f->parser->flow(@_);
    }
    return;
}

sub ctl_flow() {
    my $self = shift;
    foreach my $f ( $self->_get_flows ) {
        $f->parser->ctl_flow(@_);
    }
    return;
}

sub begin {
    my $self = shift;
    my $res  = $self->SUPER::begin(@_);
    foreach my $f ( $self->_get_flows ) {
        $f->parser->begin(@_);
    }
    return $res;
}

sub end {
    my $self = shift;
    foreach my $f ( $self->_get_flows ) {
        $f->parser->end(@_);
    }
    return $self->SUPER::end(@_);
}

1;
__END__

=head1 SEE ALSO

Flow::Split

=head1 AUTHOR

Zahatski Aliaksandr, <zag@cpan.org>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2015 by Zahatski Aliaksandr

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.

=cut