The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;

############################################################################
# BEWARE! complex stuff!
# to aid with debugging problems:
# - switch on debug mode
# - to see whats going on in direction dir part p:
#   grep for '[dir][p]'
# - to see whats transfering out of direction dir part p into next part/up:
#   grep for '[dir][p>'
#
# basic design
# - we do not have lots of member variables, instead we put everything into
#   new_analyzer as normal variables and declare various $sub = sub ... which
#   use these variables. Thus the variables get bound once to the sub and we
#   don't need to access it with $self->{field}... or so all the time
# - subs and data structures are described in new_analyzer, the most important
#   are
#   - $global_in - this is the sub data method
#   - $part_in   - called from global_in, itself and callbacks to put data
#     into a specific part. Feeds the data in the associated analyzer
#   - $imp_callback - callback for the analyzer of a specific part

############################################################################

package Net::IMP::Cascade;
use base 'Net::IMP::Base';
use fields (
    'parts',    # analyzer objects
    # we do everything with closures inside new_analyzer here, so that the
    # object has only fields for accessing some closures from subs
    'dataf',    # called from sub data
    'closef',   # called from DESTROY
);

use Net::IMP; # constants
use Carp 'croak';
use Scalar::Util 'weaken';
use Hash::Util qw(lock_ref_keys);
use Net::IMP::Debug;
use Data::Dumper;

my %rtypes_implemented_myself = map { $_ => 1 } (
    IMP_PASS,
    IMP_PREPASS,
    IMP_REPLACE,
    IMP_REPLACE_LATER,
    IMP_DENY,
    IMP_DROP,
    #IMP_TOSENDER, # not supported yet
    IMP_LOG,
    IMP_ACCTFIELD,
    IMP_FATAL,
);

sub get_interface {
    my Net::IMP::Cascade $factory = shift;
    my $parts = $factory->{factory_args}{parts};

    # collect interfaces by part
    my @if4part;
    for my $p ( @$parts ) {
	my @if;
	for my $if ( $p->get_interface(@_)) {
	    # $if should require only return types I support
	    push @if,$if
		if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
	}
	@if or return; # nothing in common
	push @if4part,\@if
    }

    # find interfaces which are supported by all parts
    my @common;
    for( my $i=0;$i<@if4part;$i++ ) {
	for my $if_i ( @{ $if4part[$i] } ) {
	    my ($in_i,$out_i) = @$if_i;
	    # check if $if_i matches at least on interface description in
	    # all other parts, e.g. $if_i is same or included in $if_k
	    # - data type/proto: $in_k should be undef or same as $in_i
	    # - return types: $out_i should include $out_k
	    for( my $k=0;$k<@if4part;$k++ ) {
		next if $i == $k; # same
		for my $if_k ( @{  $if4part[$k] } ) {
		    my ($in_k,$out_k) = @$if_k;
		    # should be same data type or $in_k undef
		    next if $in_k and ( ! $in_i or $in_k != $in_i );
		    # $out_i should include all of $out_k
		    my %out_k = map { $_ => 1 } @$out_k;
		    delete @out_k{ @$out_i };
		    next if %out_k; # some in k are not in i

		    # junction if i and k
		    push @common,[ $in_k,$out_i ];
		}
	    }
	}
    }

    # remove duplicates from match
    my (@uniq,%m);
    for( @common ) {
	my $key = ( $_->[0] // '<undef>' )."\0".join("\0",sort @{$_->[1]});
	push @uniq,$_ if ! $m{$key}++;
    }
    return @uniq;
}

sub set_interface {
    my Net::IMP::Cascade $factory = shift;
    my @if = @_;
    my $parts = $factory->{factory_args}{parts};

    my @new_parts;
    for(my $i=0;$i<@$parts;$i++) {
	my $np = $parts->[$i]->set_interface(@if)
	    or return; # cannot use interface
	$np == $parts->[$i] and next; # no change of part
	$new_parts[$i] = $np; # got new factory for part
    }

    return $factory if ! @new_parts; # interface supported by original factory

    # some parts changed, create new factory for this cascade
    for(my $i=0;$i<@$parts;$i++) {
	$new_parts[$i] ||= $parts->[$i]; # copy parts which did not change
    }

    return ref($factory)->new_factory( parts => \@new_parts );
}

sub new_analyzer {
    my ($factory,%args) = @_;

    my $p     = $factory->{factory_args}{parts};
    my $self  = $factory->SUPER::new_analyzer(%args);
    my @imp = map { $_->new_analyzer(%args) } @$p;

    # $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
    # if part is optimized away due to IMP_PASS with IMP_MAXOFFSET
    # $parts[$dir][$pi] contains instead an integer for adjustments
    # from this part
    my @parts;

    # pause/continue handling
    # maintains pause status per part
    my @pause;

    # to make sure we don't leak due to cross-references
    weaken( my $wself = $self );

    my $new_buf = sub {
	lock_ref_keys( my $buf = {
	    start   => 0,  # start of buf relativ to part
	    end     => 0,  # end of buf relativ to part
	    data    => '', # data or undef for replace_later
	    dtype   => 0,  # data type
	    rtype   => IMP_PASS,  # IMP_PASS < IMP_PREPASS < IMP_REPLACE
	    gap     => 0,  # size of gap before buf?
	    gstart  => 0,  # start of buf relativ to cascade
	    gend    => 0,  # end of buf relativ to cascade
	    eof     => 0   # flag if last buf in this direction
	});
	%$buf = ( %$buf, @_ ) if @_;
	return $buf;
    };

    my $new_part = sub {
	lock_ref_keys( my $p = {
	    ibuf => [ &$new_buf ], # buffers, at least one
	    pass          => 0,    # can pass up to ..
	    prepass       => 0,    # can prepass up to ..
	    replace_later => 0,    # will replace_later up to ..
	    adjust        => 0,    # total local adjustments from forwarded bufs
	});
	return $p;
    };

    # initialize @parts
    for( my $i=0;$i<@imp;$i++ ) {
	$parts[0][$i] = $new_part->();       # client -> server, flow 0>1>2>..
	$parts[1][$#imp-$i] = $new_part->(); # server -> client, flow 9>8>7>..
    }

    my $dump_bufs = sub {
	my $bufs = shift;
	my @out;
	for my $i (@_ ? @_: 0..$#$bufs) {
	    my $buf = $bufs->[$i];
	    my $str = ! defined( $buf->{data} ) ? '<undef>' : do {
		local $_ = $buf->{data};
		$_ = substr($_,0,27).'...' if length($_)>30;
		s{([\\\n\r\t[^:print:]])}{ sprintf("\\%03o",ord($1)) }esg;
		$_
	    };
	    push @out, sprintf("#%02d %d..%d%s%s%s %s %s [%d,%d] '%s'",
		$i,
		$buf->{start},$buf->{end}, $buf->{eof} ? '$':'',
		$buf->{gap} ? " +$buf->{gap}":"",
		defined($buf->{data}) ? '':' RL',
		$buf->{dtype},$buf->{rtype},
		$buf->{gstart},$buf->{gend},
		$str
	    );
	}
	return join("\n",@out);
    };
    my $dump_parts = sub {
	my $dir = shift;
	my $out = '';
	for my $pi (@_ ? @_ : 0..$#imp) {
	    my $part = $parts[$dir][$pi];
	    if ( ! $part ) {
		$out .= "part[$dir][$pi] - skip\n";
		next;
	    }
	    $out .= sprintf("part[%d][%d] p|pp|rl=%d|%d|%d ibuf:\n",
		$dir,$pi,$part->{pass},$part->{prepass},$part->{replace_later});
	    my $ib = $part->{ibuf};
	    $out .= $dump_bufs->( $part->{ibuf});
	}
	return $out;
    };

    my $split_buf = sub {
	my ($ibuf,$i,$fwd) = @_;
	my $buf = $ibuf->[$i];
	die "no split for packet types" if $buf->{dtype}>0;

	my $buf_before = $new_buf->(
	    %$buf,
	    eof => 0,
	    end => $buf->{start} + $fwd,  # adjust end
	    defined($buf->{data})
		? ( data => substr($buf->{data},0,$fwd,'') )  # real data
		: (),  # replacement promise
	);
	# gap in buf_before
	$buf->{gap} = 0;
	$buf->{start} = $buf_before->{end};

	# if buf was not changed gend..gstart should reflect the
	# original length of the data
	if ( $buf->{rtype} != IMP_REPLACE ) {
	    $buf_before->{gend} = ( $buf->{gstart} += $fwd );
	} else {
	    # split gstart..gend into full|0 per convention
	    $buf->{gstart} = $buf->{gend};
	}

	# put buf_before before buf in ibuf
	splice(@$ibuf,$i,0,$buf_before);
    };

    my $fwd_collect; # collect bufs which can be forwarded
    my $fwd_up;      # collect what can be passed up
    my $exec_fwd;    # do the collected forwarding to next part or up

    my $global_in;  # function where data gets fed into from outside (sub data)
    my $part_in;    # internal feed into each part

    my $imp_callback;   # synchronization wrapper around callback for analyzers
    my $_imp_callback;  # real callback for the analyzers

    # pass passable bufs in part starting with ibuf[i]
    # returns all bufs which can be passed and strips them from part.ibuf
    $fwd_collect = sub {
	my ($dir,$pi,$i,$r_passed) = @_;
	my $part = $parts[$dir][$pi];
	my $ibuf = $part->{ibuf};
	$DEBUG && debug(
	    "fwd_collect[$dir][$pi]: p=$part->{pass} pp=$part->{prepass} "
	    .$dump_bufs->($ibuf));
	my @fwd;
	for my $pp (qw(pass prepass)) {
	    my $pass = $part->{$pp} or next;
	    for( ;$i<@$ibuf;$i++ ) {
		my $buf = $ibuf->[$i];
		last if ! $buf->{dtype}; # dummy buf
		if ( $pass != IMP_MAXOFFSET and $buf->{start} >= $pass ) {
		    $DEBUG && debug(
			"fwd_collect[$dir][$pi]: reset $pp due to start[$i]($buf->{start})>=$pp($pass)");
		    $part->{$pp} = 0;
		    last;
		}
		die "cannot pass bufs with replace_later"
		    if ! defined $buf->{data};
		if ( $pass == IMP_MAXOFFSET or $buf->{end} <= $pass ) {
		    # whole buffer can be passed
		    $DEBUG && debug(
			"fwd_collect[$dir][$pi]: pass whole buffer[$i] $buf->{start}..$buf->{end}");
		    $buf->{rtype} = IMP_PREPASS if $pp eq 'prepass'
			and $buf->{rtype} == IMP_PASS;
		    push @fwd,[ $pi,$dir,$buf ];

		    # r_passed is set from part_in to track position if data
		    # are passed. In case of prepass we don't pass data but
		    # only put them into fwd
		    next if $r_passed && $pp eq 'prepass';

		    # track what got passed for part_in
		    $$r_passed = $buf->{end} if $r_passed;

		    # remove passed data from ibuf, if ! r_passed also prepassed
		    # data (called from imp_callback)
		    shift(@$ibuf);
		    $i--;

		    if ( ! @$ibuf ) {
			if ( $part->{pass} == IMP_MAXOFFSET || $buf->{eof} ) {
			    # part done, skip it in the future
			    push @fwd,[$pi,$dir,undef]; # buf = undef is special
			}
			# insert dummy
			@$ibuf = $new_buf->(
			    start  => $buf->{end},
			    end    => $buf->{end},
			    gstart => $buf->{gend},
			    gend   => $buf->{gend},
			    # keep type for streaming data
			    $buf->{dtype} < 0 ? ( dtype => $buf->{dtype} ):(),
			);
			last;
		    }

		} else {
		    # only part of buffer can be passed
		    # split buffer and re-enter loop, this will foreward the
		    # first part and keep the later part
		    $DEBUG && debug(
			"fwd_collect[$dir][$pi]: need to split buffer[$i]: $buf->{start}..$pass..$buf->{end}");
		    $split_buf->($ibuf,$i,$pass - $buf->{start});
		    redo; # don't increase $i!
		}
	    }
	}
	return @fwd;
    };

    $fwd_up = sub {
	my ($dir,$buf) = @_;
	if ( $buf->{gstart} == $buf->{gend} && ! $buf->{gap}
	    && $buf->{rtype} ~~ [ IMP_PASS, IMP_PREPASS ]) {
	    # don't repeat last (pre)pass because of empty buffer
	    return;
	}

	return [
	    $buf->{rtype},
	    $dir,
	    $buf->{gend},
	    ($buf->{rtype} == IMP_REPLACE) ? ( $buf->{data} ):()
	];
    };

    $exec_fwd = sub {
	my @fwd = @_;
	if (@fwd>1) {
	    $DEBUG && debug("trying to merge\n".join("\n", map {
		! defined $_->[0]
		    ? "<cb>"
		    : "fwd[$_->[1]][$_->[0]] " .
			( $_->[2] ? $dump_bufs->([$_->[2]]) : '<pass infinite>')
	    } @fwd));
	    # try to compress
	    my ($lpi,$ldir,$lbuf);
	    for( my $i=0;$i<@fwd;$i++ ) {
		if ( ! defined $fwd[$i][0] || ! defined $fwd[$i][2]) {
		    $lpi = undef;
		    next;
		}
		if ( ! defined $lpi
		    or $lpi  != $fwd[$i][0]
		    or $ldir != $fwd[$i][1] ) {
		    ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
		    next;
		}

		my $buf = $fwd[$i][2];

		if ( not $buf->{gap}
		    and $buf->{dtype} < 0
		    and $buf->{start} == $lbuf->{end}
		    and $buf->{rtype} == $lbuf->{rtype}
		    and $buf->{dtype} == $lbuf->{dtype}
		) {
		    if ( $buf->{rtype} == IMP_REPLACE ) {
			if ( $lbuf->{gend} == $buf->{gend} ) {
			    # same global end, merge data
			    $lbuf->{data} .= $buf->{data};
			} elsif ( $buf->{data} ne '' or $lbuf->{data} ne '' ) {
			    # either one not empty, no merge
			    next;
			}
		    } else {
			# unchanged, append
			$lbuf->{data} .= $buf->{data};
		    }
		    $DEBUG && debug("merge bufs ".$dump_bufs->([$lbuf,$buf]));
		    $lbuf->{gend} = $buf->{gend};
		    $lbuf->{end}  = $buf->{end};
		    $lbuf->{eof}  = $buf->{eof};
		    splice(@fwd,$i,1,());
		    $i--;
		    next;

		} else {
		    ($lpi,$ldir,$lbuf) = @{$fwd[$i]};
		    next;
		}
	    }
	}
	while ( my $fwd = shift(@fwd)) {
	    my $npi = my $pi = shift(@$fwd);
	    if ( ! defined $npi ) {
		# propagate prepared IMP callback
		$wself->run_callback($fwd);
		next;
	    }

	    my ($dir,$buf) = @$fwd;

	    if ( $buf ) {
		my $np;
		my $adjust = 0;
		while (1) {
		    $npi += $dir?-1:+1;
		    last if $npi<0 or $npi>=@imp;
		    last if ref( $np = $parts[$dir][$npi] );
		    $adjust += $np;
		    $DEBUG && debug("skipping pi=$npi");
		}

		if ( $buf->{eof} ) {
		    # add pass infinite to fwd to propagate eof
		    push @fwd,[ $pi,$dir,undef ];
		}
		if ( $np ) {
		    # feed into next part
		    my $nib = $np->{ibuf};
		    # adjust start,end based on end of npi and gap
		    $buf->{start} = $nib->[-1]{end} + $buf->{gap} + $adjust;
		    $buf->{end} = $buf->{start} + length($buf->{data});
		    $DEBUG && debug(
			"fwd_next[$dir][$pi>$npi] ".$dump_bufs->([$buf]));
		    $part_in->($npi,$dir,$buf);
		} else {
		    # output from cascade
		    my $cb = $fwd_up->($dir,$buf) or next;
		    $DEBUG && debug(
			"fwd_up[$dir][$pi>>] ".$dump_bufs->([$buf]));
		    $wself->run_callback($cb);
		}

	    # special - part is done with IMP_PASS IMP_MAXOFFSET
	    } else {
		# skip if we had a pass infinite already
		next if ! ref $parts[$dir][$pi];

		$parts[$dir][$pi] = $parts[$dir][$pi]->{adjust};
		if ( grep { ref($_) } @{ $parts[$dir] } ) {
		    # we have other unfinished parts, skip only this part
		    $DEBUG && debug(
			"part[$dir][$pi>$npi] will be skipped in future, adjust=$parts[$dir][$pi]");
		} else {
		    # everything can be skipped
		    $DEBUG && debug(
			"part[$dir][$pi>>] all parts will be skipped in future");
		    # pass rest
		    $wself->run_callback([ IMP_PASS,$dir,IMP_MAXOFFSET ]);
		}
	    }
	}
    };

    # the data function
    # called from sub data on new data and from $process when data are finished
    # in on part and should be transferred into the next part
    #  $pi    - index into parts
    #  $dir   - direction (e.g. target part is $parts[$dir][$pi])
    #  $buf   - new buffer from $new_buf->() which might be merged with existing
    $part_in = sub {
	my ($pi,$dir,$buf) = @_;
	$DEBUG && debug( "part_in[$dir][$pi]: ".$dump_bufs->([$buf]));

	my $part = $parts[$dir][$pi];
	my $ibuf = $part->{ibuf};
	my $lbuf = $ibuf->[-1];
	my $lend = $lbuf->{end};

	# some sanity checks
	if(1) {
	    die "data after eof [$dir][$pi] ".$dump_bufs->([$lbuf,$buf])
		if $lbuf->{eof};
	    if ( $buf->{start} != $lend ) {
		if ( $buf->{start} < $lend ) {
		    die "overlapping data off($buf->{start})<last.end($lend) in part[$dir][$pi]";
		} elsif ( ! $buf->{gap} ) {
		    die "gap should be set because off($buf->{start})>last.end($lend) in part[$dir][$pi]"
		}
	    } elsif ( $buf->{gap} ) {
		die "gap specified even if off($buf->{start}) == last.end"
	    }
	    $part->{pass} == IMP_MAXOFFSET and die
		"pass infinite should have been optimized by removing part[$dir][$pi]";
	}

	# add data to buf
	if ( $lbuf->{data} eq '' and $lbuf->{rtype} == IMP_PASS ) {
	    # empty dummy buffer
	    $DEBUG && debug("part_in[$dir][$pi]: replace dummy buffer");
	    @$ibuf == 1 or die "empty dummy buffer should only be at beginning";
	    @$ibuf = $buf;

	} elsif ( ! $buf->{gap}
	    and $buf->{data} eq ''
	    and $buf->{rtype} == $lbuf->{rtype}
	    and $buf->{dtype} == $lbuf->{dtype}
	    and $buf->{dtype} < 0
	    and ! $buf->{eof}
	    ) {
	    # just update eof,[g]end of lbuf
	    $DEBUG && debug(
		"part_in[$dir][$pi]: set lbuf end=$buf->{end} gend=$buf->{gend}");
	    $lbuf->{end}  = $buf->{end};
	    $lbuf->{gend} = $buf->{gend};
	    # nothing to do with these empty data
	    $DEBUG && debug("part_in[$dir][$pi] nothing to do on empty buffer");
	    return;

	} else {
	    # add new buf
	    $DEBUG && debug("part_in[$dir][$pi]: add new buffer");
	    push @$ibuf,$buf;
	}

	# determine what can be forwarded immediatly
	my @fwd = $fwd_collect->($dir,$pi,$#$ibuf,\$lend);

	if ( $buf->{eof} ? $lend <= $buf->{end} : $lend < $buf->{end} ) {
	    # send new data to the analyzer
	    my $rl = $part->{replace_later};
	    for(@$ibuf) {
		next if $_->{start} < $lend;
		die "last_end should be on buffer boundary"
		    if $_->{start} > $lend;
		$lend = $_->{end};
		$DEBUG && debug(
		   "analyzer[$dir][$pi] << %d bytes %s \@%d%s -> last_end=%d",
		    $_->{end} - $_->{start},
		    $_->{dtype},
		    $_->{start},$_->{gap} ? "(+$_->{gap})":'',
		    $lend
		);
		$imp[$pi]->data($dir,
		    $_->{data},
		    $_->{gap} ? $_->{start}:0,
		    $_->{dtype}
		);
		$imp[$pi]->data($dir,'',0, $_->{dtype})
		    if $buf->{eof} && $_->{data} ne '';
		$rl or next;
		if ( $rl == IMP_MAXOFFSET or $rl>= $lend ) {
		    $buf->{data} = undef;
		} else {
		    $rl = $part->{replace_later} = 0; # reset
		}
	    }
	} else {
	    $DEBUG && debug(
		"nothing to analyze[$dir][$pi]: last_end($lend) < end($buf->{end})");
	}

	# forward data which can be (pre)passed
	$exec_fwd->(@fwd) if @fwd;
    };

    $_imp_callback = sub {
	my $pi = shift;

	my @fwd;
	for my $rv (@_) {
	    my $rtype = shift(@$rv);

	    if ( $rtype ~~ [ IMP_FATAL, IMP_DENY, IMP_DROP, IMP_ACCTFIELD ]) {
		$DEBUG && debug("callback[.][$pi] $rtype @$rv");
		$wself->run_callback([ $rtype, @$rv ]);

	    } elsif ( $rtype == IMP_LOG ) {
		my ($dir,$offset,$len,$level,$msg) = @$rv;
		$DEBUG && debug(
		    "callback[$dir][$pi] $rtype '$msg' off=$offset len=$len lvl=$level");
		# approximate offset to real position
		my $newoff = 0;
		my $part = $parts[$dir][$pi];
		for ( @{$part->{ibuf}} ) {
		    if ( $_->{start} <= $offset ) {
			$offset = ( $_->{rtype} == IMP_REPLACE )
			    ? $_->{gstart}
			    : $_->{gstart} - $_->{start} + $offset;
		    } else {
			last
		    }
		}
		$wself->run_callback([ IMP_LOG,$dir,$offset,$len,$level,$msg ]);

	    } elsif ( $rtype == IMP_PAUSE ) {
		my $dir = shift;
		$DEBUG && debug("callback[$dir][$pi] $rtype");
		next if $pause[$pi];
		$pause[$dir][$pi] = 1;
		$wself->run_callback([ IMP_PAUSE ]) if grep { $_ } @pause > 1;

	    } elsif ( $rtype == IMP_CONTINUE ) {
		my $dir = shift;
		$DEBUG && debug("callback[$dir][$pi] $rtype");
		delete $pause[$dir][$pi];
		$wself->run_callback([ IMP_CONTINUE ])
		    if not grep { $_ } @{$pause[$dir]};

	    } elsif ( $rtype ~~ [ IMP_PASS, IMP_PREPASS ] ) {
		my ($dir,$offset) = @$rv;
		$DEBUG && debug("callback[$dir][$pi] $rtype $offset");
		ref(my $part = $parts[$dir][$pi]) or next; # part skippable?
		if ( $rtype == IMP_PASS ) {
		    next if $part->{pass} == IMP_MAXOFFSET; # no change
		    if ( $offset == IMP_MAXOFFSET ) {
			$part->{pass} = IMP_MAXOFFSET;
			$part->{prepass} = 0; # pass >= prepass
		    } elsif ( $offset > $part->{pass} ) {
			$part->{pass} = $offset;
			if ( $part->{prepass} != IMP_MAXOFFSET
			    and $part->{prepass} <= $offset ) {
			    $part->{prepass} = 0; # pass >= prepass
			}
		    } else {
			next; # no change
		    }
		} else {  # IMP_PREPASS
		    next if $part->{prepass} == IMP_MAXOFFSET; # no change
		    if ( $offset == IMP_MAXOFFSET ) {
			$part->{prepass} = IMP_MAXOFFSET;
		    } elsif ( $offset > $part->{prepass} ) {
			$part->{prepass} = $offset;
		    } else {
			next; # no change
		    }
		}

		# pass/prepass got updated, so we might pass some more data
		push @fwd, $fwd_collect->($dir,$pi,0);

	    } elsif ( $rtype == IMP_REPLACE ) {
		my ($dir,$offset,$newdata) = @$rv;
		$DEBUG && debug(
		    "callback[$dir][$pi] $rtype $dir $offset len=%d",
		    length($newdata));
		ref(my $part = $parts[$dir][$pi])
		    or die "called replace for passed data";
		my $ibuf = $part->{ibuf};

		# sanity checks
		die "called replace although pass=IMP_MAXOFFSET" if ! $part;
		die "no replace with IMP_MAXOFFSET" if $offset == IMP_MAXOFFSET;
		die "called replace for already passed data"
		    if $ibuf->[0]{start} > $offset;

		while (@$ibuf) {
		    my $buf = $ibuf->[0];
		    if ( $offset >= $buf->{end} ) {
			# replace complete buffer
			$DEBUG && debug(
			    "replace complete buf $buf->{start}..$buf->{end}");
			if ( ! defined($buf->{data})
			    or $buf->{data} ne $newdata ) {
			    $buf->{rtype} = IMP_REPLACE;
			    $buf->{data} = $newdata;
			    $part->{adjust} +=
				length($newdata) - $buf->{end} + $buf->{start};
			    $newdata = ''; # in the next buffers replace with ''
			}
			push @fwd,[ $pi,$dir,$buf ];
			shift(@$ibuf);
			if ( ! @$ibuf ) {
			    # all bufs eaten
			    die "called replace for future data"
				if $buf->{end}<$offset;
			    @$ibuf = $new_buf->( %$buf,
				data   => '',
				start  => $buf->{end},
				end    => $buf->{end},
				gstart => $buf->{gend},
				# packet types cannot get partial replacement
				# at end
				$buf->{dtype} > 0 ? ( dtype => 0 ):()
			    );
			    # remove eof from buf in @fwd because we added
			    # new one
			    $fwd[-1][2]{eof} = 0;
			    last;
			}
			last if $buf->{end} == $offset;
		    } else {
			# split buffer and replace first part
			$DEBUG && debug(
			    "replace - split buf $buf->{start}..$offset..$buf->{end}");
			$split_buf->($ibuf,0,$offset-$buf->{start});
			redo;
		    }
		}

	    } elsif ( $rtype == IMP_REPLACE_LATER ) {
		my ($dir,$offset) = @$rv;
		$DEBUG && debug("callback[$dir][$pi] $rtype $offset");
		ref(my $part = $parts[$dir][$pi])
		    or die "called replace for passed data";
		my $ibuf = $part->{ibuf};
		$_->{replace_later} == IMP_MAXOFFSET and next; # no change

		# sanity checks
		die "called replace_later although pass=IMP_MAX_OFFSET"
		    if ! $part;
		die "called replace for already passed data" if
		    $offset != IMP_MAXOFFSET and
		    $ibuf->[0]{start} > $offset;

		if ( $offset == IMP_MAXOFFSET ) {
		    $_->{replace_later} = IMP_MAXOFFSET;
		    # change all to replace_later
		    $_->{data} = undef for(@$ibuf);
		    next;
		} elsif ( $offset <= $part->{replace_later} ) {
		    # no change
		} else {
		    $part->{replace_later} = $offset;
		    for(@$ibuf) {
			defined($_->{data}) or next; # already replace_later
			my $len = length($_->{data}) or last; # dummy buffer
			if ( $_->{start} + $len <= $offset ) {
			    $_->{data} = undef;
			} else {
			    $part->{replace_later} = 0;
			    last;
			}
		    }
		}
	    } else {
		$DEBUG && debug("callback[.][$pi] $rtype @$rv");
		die "don't know how to handle rtype $rtype";
	    }
	}

	# pass to next part/output
	$exec_fwd->(@fwd) if @fwd;
    };

    # While we are in $part_in function we will only spool callbacks and process
    # them at the end. Otherwise $dataf might cause call of callback which then
    # causes call of dataf etc - which makes debugging a nightmare.

    my $collect_callbacks;
    $global_in = sub {
	my ($dir,$data,$offset,$dtype) = @_;

	my %buf = (
	    data  => $data,
	    dtype => $dtype // IMP_DATA_STREAM,
	    rtype => IMP_PASS,
	    eof   => $data eq '',
	);

	my $adjust = 0;
	my $pi = $dir ? $#imp:0; # enter into first or last part
	my $np;
	while (1) {
	    ref( $np = $parts[$dir][$pi] ) and last;
	    $adjust += $np;
	    $pi += $dir?-1:1;
	    if ( $pi<0 or $pi>$#imp ) {
		$DEBUG && debug("all skipped");
		if ( my $cb = $fwd_up->($dir,$new_buf->(%buf))) {
		    $self->run_callback($cb);
		}
		return;
	    }
	}

	return if ! ref $np; # got IMP_PASS IMP_MAXOFFSET for all

	my $ibuf_end = $np->{ibuf}[-1]{gend};
	if ( ! $offset ) {
	    # no gap between data
	    $buf{gstart} = $ibuf_end;
	} elsif ( $offset < $ibuf_end ) {
	    die "overlapping data";
	} elsif ( $offset > $ibuf_end ) {
	    # gap between data
	    $buf{gstart} = $offset;
	    $buf{gap} = $offset - $ibuf_end;
	} else {
	    # there was no need for giving offset
	    $buf{gstart} = $ibuf_end;
	}
	$buf{gend}  = $buf{gstart} + length($data);
	$buf{start} = $buf{gstart} + $adjust;
	$buf{end}   = $buf{gend} + $adjust;

	$collect_callbacks ||= [];
	$part_in->( $pi,$dir, $new_buf->(%buf));

	while ( my $cb = shift(@$collect_callbacks)) {
	    $_imp_callback->(@$cb);
	}
	$collect_callbacks = undef
    };

    # wrapper which spools callbacks if within dataf
    $imp_callback = sub {
	if ( $collect_callbacks ) {
	    # only spool and execute later
	    push @$collect_callbacks, [ @_ ];
	    return;
	}
	return $_imp_callback->(@_)
    };

    # setup callbacks
    $imp[$_]->set_callback( $imp_callback,$_ ) for (0..$#imp);

    # make some closures available within methods
    $self->{dataf} = $global_in;
    $self->{closef} = sub {
	$global_in = $part_in = $imp_callback = $_imp_callback = undef;
	@parts = ();
    };
    return $self;
}

sub data {
    my $self = shift;
    $self->{dataf}(@_);
}

sub DESTROY {
    my $closef = shift->{closef};
    $closef->() if $closef;
}


1;

__END__

=head1 NAME

Net::IMP::Cascade - manages cascade of IMP filters

=head1 SYNOPSIS

    use Net::IMP::Cascade;
    use Net::IMP::Pattern;
    use Net::IMP::SessionLog;
    ...
    my $imp = Net::IMP::Cascade->new_factory( parts => [
	Net::IMP::Pattern->new_factory..,
	Net::IMP::SessionLog->new_factory..,
    ]);

=head1 DESCRIPTION

C<Net::IMP::Cascade> puts multiple IMP analyzers into a cascade.
Data get analyzed starting with part#0, then part#1... etc for direction 0
(client to server), while for direction 1 (server to client) the data get
analyzed the opposite way, ending in part#0.

The only argument special to C<new_factory> is C<parts>, which is an array
reference of L<Net::IMP> factory objects.
When C<new_analyzer> gets called on the L<Net::IMP::Cascade>> factory,
C<new_analyzer> will be called on the factory objects of the parts too, keeping
all arguments, except C<parts>.

=head1 TODO

Currently IMP_TOSENDER is not supported

=head1 BUGS

The code is way more complex than I originally hoped, even after a nearly
complete rewrite of the innards. So probably the problem itself is complex.
For debugging help see comments on top of code.

=head1 AUTHOR

Steffen Ullrich <sullr@cpan.org>

=head1 COPYRIGHT

Copyright by Steffen Ullrich.

This module is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.