The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;
package Net::IMP::Cascade;
use base 'Net::IMP::Base';
use fields (
    'parts',    # analyzer objects
    # we do everything with closures inside new_analyzer here, so that the
    # object has only fields for accessing some closures from subs
    'dataf',    # called from sub data
    'closef',   # called from DESTROY
);

use Net::IMP; # constants
use Carp 'croak';
use Scalar::Util 'weaken';
use Hash::Util 'lock_keys';
use Net::IMP::Debug;
use Data::Dumper;

my %rtypes_implemented_myself = map { $_ => 1 } (
    IMP_PASS,
    IMP_PREPASS,
    IMP_REPLACE,
    IMP_DENY,
    IMP_DROP,
    #IMP_TOSENDER, # not supported yet
    IMP_LOG,
    IMP_ACCTFIELD,
);

sub interface {
    my Net::IMP::Cascade $factory = shift;
    my $parts = $factory->{factory_args}{parts};

    # collect interfaces by part
    my @if4part;
    for my $p ( @$parts ) {
	my @if;
	for my $if ( $p->interface(@_)) {
	    # $if should require only return types I support
	    push @if,$if 
		if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
	}
	@if or return; # nothing in common
	push @if4part,\@if
    }

    # find interfaces which are supported by all parts
    my @common;
    for( my $i=0;$i<@if4part;$i++ ) {
	for my $if_i ( @{  $if4part[$i] } ) {
	    my ($in_i,$out_i) = @$if_i;
	    # check if $if_i matches at least on interface description in 
	    # all other parts, e.g. $if_i is same or included in $if_k
	    # - data type/proto: $in_k should be undef or same as $in_i
	    # - return types: $out_i should include $out_k
	    for( my $k=0;$k<@if4part;$k++ ) {
		next if $i == $k; # same
		for my $if_k ( @{  $if4part[$k] } ) {
		    my ($in_k,$out_k) = @$if_k;
		    # should be same data type or $in_k undef
		    next if $in_k and ( ! $in_i or $in_k != $in_i ); 
		    # $out_i should include all of $out_k
		    my %out_k = map { $_ => 1 } @$out_k;
		    delete @out_k{ @$out_i };
		    next if %out_k; # some in k are not in i

		    # junction if i and k
		    push @common,[ $in_k,$out_i ];
		}
	    }
	}
    }

    # remove duplicates from match
    my (@uniq,%m);
    for( @common ) {
	my $key = ( $_->[0] // '<undef>' )."\0".join("\0",sort @{$_->[1]});
	push @uniq,$key if ! $m{$key}++;
    }
    return @uniq;
}

sub new_analyzer {
    my ($factory,%args) = @_;

    my $p     = $factory->{factory_args}{parts};
    my $self  = $factory->SUPER::new_analyzer(%args);
    my @imp = map { $_->new_analyzer(%args) } @$p;
    $self->{parts} = \@imp;

    # $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
    my @parts;

    # each entry in parts consists of
    # bufs - list of data buffers together with their state of processing
    #   each entry in bufs consists of a hash with
    #   - data: the data
    #   - dtype: the type of data, e.g. IMP_DATA_STREAM, IMP_DATA_PACKET...
    #   - endpos: position of end of data, relativ to input stream of part
    #   - rtype: result type, which caused this buffer, eg IMP_PASS, 
    #     IMP_REPLACE... initially 0 (e.g. no type)
    #   - eof: true if last buf in stream
    #   because replacements might add/delete bytes we need to track these
    #   adjustments for a chunk thru all parts. Unfortunatly this is fairly
    #   complex. Example: if we replace 10 bytes with 3 bytes the local
    #   adjustment will be -7. This adjustment will affect the adjustments
    #   of all following data and will effect the final adjustment of this
    #   data chunk in the cascade. But adjustments from later data should
    #   not effect the adjustments for the current data..
    #   - badjust: adjustment done in this [b]uffer, e.g. only caused by
    #     replacements of data in this buffer in the current part
    #   - gbadjust: like badjust, but accumulated over all parts ([g]lobal])
    #   - padjust: accumulated adjustments relativ to input into this [p]art,
    #     e.g. sum of badjust of this and all previous buf in this part
    #   - gpadjust: like padjust, but over all parts ([g]lobal)
    # fwapos - up to which position related to input stream data got
    #   forwarded to analyzer (or passed w/o needing analyzer)
    #   meaning: [pos]ition up to which it got [f]or[w]arded to [a]nalyzer
    # gap    - flag, set if we recently skipped data due to IMP_PASS,
    #   reset, when we send data to the analyzer.
    #   If set the data will be forwarded to the analyzer with offset given
    # lppos  - offset from [l]ast [p]ass|pre[p]ass reply (related to input
    #   stream)
    # lptype - type of reply which updated lppos (pass|prepass)
    # partial_pass - flag if we got a (pre)pass for a part of the last
    #   buffer, in case we have a non-streaming data type.
    #   partial pass will be ignored if followed by another (pre)pass, but
    #   it cannot be followed by a replacement (only whole packets can be
    #   replaced)

    # initialize @parts
    for( my $i=0;$i<@imp;$i++ ) {

	# data from client
	$parts[0][$i] = my $h = {
	    bufs   => [ Net::IMP::Cascade::_Buf->new ],
	    fwapos => 0,
	    gap    => 0,
	    lppos  => 0,
	    lptype => IMP_PASS,
	    partial_pass => 0,
	};
	lock_keys(%$h);

	# data from server get processed in the other direction
	#  [CLIENT] --> 00 -> 11 -> .. [SERVER] .. --> 11 -> 00 ->
	$parts[1][$i] = $h = {
	    bufs   => [ Net::IMP::Cascade::_Buf->new ],
	    fwapos => 0,
	    gap    => 0,
	    lppos  => 0,
	    lptype => IMP_PASS,
	    partial_pass => 0,
	};
	lock_keys(%$h);
    }

    # global lastpass
    # if all analyzers in cascade issue a pass/prepass into the future
    # we can propagate the minimum offset into future early
    # $global_lastpass[$dir] -> [$lppos,$lptype]
    my @global_lastpass = (
	{ pos => 0, type => 0 },
	{ pos => 0, type => 0 }
    );

    # to make sure we don't leak due to cross-references
    weaken( my $wself = $self );

    # returns dump of parts for direction incl. bufs, only used for debugging
    my $_dump_part = sub {
	my $dir = shift;
	my $t = '';
	for my $i ( @_ ? @_ : (0..$#imp)) {
	    my $p = $parts[$dir][$i];
	    my $bufs = $p->{bufs};
	    $t.= sprintf("[%d] $imp[$i] fwa(%d) lp=%s(%d) gap(%d)\n",
		$i,$p->{fwapos},$p->{lptype},$p->{lppos},$p->{gap});
	    for my $buf (@$bufs) {
		$t .= sprintf(" - %s %s(%s) Badj(%d) Padj(%d) Gadj(%d) %s'%s'\n",
		    $buf->{dtype} // '<undef>',
		    $buf->{rtype},
		    length($buf->{data})
			? ($buf->{endpos}-length($buf->{data})+1)
			    ."..$buf->{endpos}"
			: $buf->{endpos},
		    $buf->{badjust},
		    $buf->{padjust},
		    $buf->{gpadjust},
		    $buf->{eof} ? '<eof> ' :'',
		    $buf->{data}
		);
	    }
	}
	return $t;
    };

    # the data function
    # called from sub data on new data and from $process when data are finished
    # in on part and should be transferred into the next part
    #  $pi    - index into parts
    #  $dir   - direction (e.g. target part is $parts[$dir][$pi])
    #  $data  - the data
    #  $pos   - offset of $data relativ to input into current part $pi
    #  $dtype - data type of data (stream, packet...)
    #  $rtype - if called from previous part we get the result type of the data,
    #    e.g. if they are result of replacement, pass....
    #    this needs to be propagated thru the parts and will be used in the
    #    final result type
    #  $eof   - are $data the last data in this direction?
    #  $gbadjust, $gpadjust - fields from buf which was done in previous part
    #    and got transferred into this part. Needed to accumulate adjustments
    #    over all parts.
    my $process;
    my $_dataf = sub {
	my ($pi,$dir,$data,$pos,$dtype,$rtype,$eof,$gbadjust,$gpadjust) = @_;
	$pi = @imp+$pi if $pi<0;

	$DEBUG && debug("dataf[$dir][$pi] len=".length($data)." "
	    .($pos ? "pos=$pos":'<nooff>' )." "
	    .( $rtype||'' )
	    ." $dtype adj=$gbadjust/$gpadjust eof=$eof\n"
	    .$_dump_part->($dir,$pi)
	);
	my $p = $parts[$dir][$pi];
	my $bufs = $p->{bufs};
	my $endpos = $bufs->[-1]{endpos};

	# add data to buf:
	# if there is no gap and rtype of buf matches and no adjustments are 
	# used and dtype is stream type, then we can add data to an 
	# existing buf, otherwise we need to create a new one
	# data from buffers with adjustments can never be merged, because
	# adjustments are considered beeing at the end of the buf, not
	# somewhere in the middle
	if ( $pos and $endpos > $pos ) {
	    die "overlapping data ($pos,$endpos)"

	} elsif ( 
	    ( ! $pos or $endpos == $pos ) # no gap
	    # caused by same result type
	    and ( ! $bufs->[-1]{rtype} or ($rtype||0) == $bufs->[-1]{rtype} ) 
	    and ! $gbadjust and ! $bufs->[-1]{gbadjust} # no adjustments 
	    and ( # same streaming data type
		! defined $bufs->[-1]{dtype} or 
		$dtype <0 and $dtype == $bufs->[-1]{dtype} )
	    ) {
	    # append
	    $endpos += length($data);
	    $bufs->[-1]{data} .= $data;
	    $bufs->[-1]{endpos} = $endpos;
	    $bufs->[-1]{eof} = $eof;
	    $bufs->[-1]{rtype} ||= $rtype||0;
	    $bufs->[-1]{dtype} = $dtype;

	} else {
	    # gap, different type or adjustments involved
	    $endpos = $pos + length($data);
	    push @$bufs, Net::IMP::Cascade::_Buf->new(
		data      => $data,
		endpos    => $endpos,
		dtype     => $dtype||0,
		rtype     => $rtype||0,
		eof       => $eof,
		badjust   => 0,
		gbadjust  => $gbadjust,
		# padjust of previous buf is accumulated badjust of all
		# previous bufs together
		padjust   => $bufs->[-1]{padjust},
		gpadjust  => $gpadjust,
	    );
	}

	# if a new buffer was created we can now process the buffers
	$process->($pi,$dir);

    };

    # This is the central part. It processes buffers for $dir in part $pi.
    # It gets called from $_dataf or $_imp_cb whenever:
    # - new data arrived thru $_dataf from outside or previous part
    #   -> send to analyzer if necessary
    # - xor lppos increased due to IMP_(PRE)PASS callback from analyzer
    #   -> check if buffered data can be forwarded to next part
    # - xor replacements got added due to callback from analyzer, only in this
    #   case @fwd will be given and contains buffers, which should be
    #   propagated to next part
    #   -> forward replaced buffers
    $process = sub {
	my ($pi,$dir,@fwd) = @_;

	my $p    = $parts[$dir][$pi];
	my $bufs = $p->{bufs};

	# @fwd contains buffers to forward into next part
	# this is only already set if called from callback on replacement,
	# otherwise we will try to fill it based on bufs in current part

	if ( ! @fwd ) {
	    # new data added or lppos changed
	    # check if we have data in bufs which might be added to @fwd

	    my $endpos = $bufs->[-1]{endpos};
	    die "endpos mismatch" if $endpos < $p->{fwapos};

	    #debug( "bufs[$dir][$pi] fwapos=$p->{fwapos}, endpos=$endpos,".
	    #   " lppos=$p->{lppos} -- " .Dumper($bufs));

	    # up to offset=lppos we can forward w/o sending it to the analyzer
	    # (on IMP_PASS) or send it to the analyzer after forwarding it
	    # (IMP_PREPASS)
	    while ( my $buf = shift(@$bufs) ) {
		my $keep = ( $p->{lppos} == IMP_MAXOFFSET) 
		    ? -1 : $buf->{endpos} - $p->{lppos};
		if ( $keep <=0 ) {
		    $DEBUG && debug("fwd complete buf lppos=%d endpos=%d",
			$p->{lppos}, $buf->{endpos});
		    # we don't need to keep anything in the buf, e.g. fwd
		    # complete buf
		    # we might need to adjust the types, e.g. if lptype is
		    # IMP_PASS but buf.type is IMP_PREPASS this will result in
		    # IMP_PREPASS. The types are sorted in Net::IMP by
		    # importance so we can just use the largest
		    $buf->{rtype} = $p->{lptype} if $p->{lptype}>$buf->{rtype};
		    $p->{partial_pass} = 0;

		} elsif ( $keep < length($buf->{data}) ) {
		    # we can forward parts of buf only
		    # split $buf, $keep bytes will be kept in $bufs
		    $DEBUG && debug("fwd part(buf) lppos=%d endpos=%d keep=%d",
			$p->{lppos}, $buf->{endpos}, $keep);

		    if ( $buf->{dtype} >0 ) {
			# only streaming data might be split into arbitrary
			# chunks, others can only be handled as whole packets
			# thus just ignore this (pre)pass and hope we will get
			# one with extends to the whole packet
			$DEBUG && debug( sprintf(
			    "ignoring partial (pre)pass for %s lppos=%d endpos=%d keep=%d",
			    $buf->{dtype},$p->{lppos}, $buf->{endpos}, $keep));

			# note the partial pass to croak if followed by replacement
			$p->{partial_pass} = 1;
			next;
		    }

		    my $data = substr($buf->{data},-$keep,$keep,'');

		    # put buf with rest of data back into @$bufs
		    # adjustments will stay in the rest, because they are
		    # considered beeing at the end of the buf
		    unshift @$bufs, Net::IMP::Cascade::_Buf->new(
			data     => $data,
			endpos   => $buf->{endpos},
			badjust  => $buf->{badjust},
			gbadjust => $buf->{gbadjust},
			padjust  => $buf->{padjust},
			gpadjust => $buf->{gpadjust},
			eof      => $buf->{eof},
			dtype    => $buf->{dtype},
			rtype    => $buf->{rtype},
		    );

		    # adjust endpos of buf we forward
		    $buf->{endpos} -= $keep;
		    # adjust type: the more important wins
		    $buf->{rtype} = $p->{lptype} if $p->{lptype}>$buf->{rtype};

		    # adjustments and eof will stay with rest of data in @$bufs
		    # padjust and gpadjust needs to be fixed to not contain any
		    # adjustments which stay in the rest
		    $buf->{padjust}  -= $buf->{badjust};
		    $buf->{gpadjust} -= $buf->{gbadjust};
		    $buf->{badjust}  = 0;
		    $buf->{gbadjust} = 0;
		    $buf->{eof}      = 0;

		} else {
		    # there is nothing we can forward, but buf back into @$bufs
		    # and stop processing @$bufs
		    unshift @$bufs,$buf;
		    last;
		}


		# we have some $buf to forward because lppos allowed us so
		# if lptype is IMP_PREPASS we need to send it to analyzer too,
		# otherwise (IMP_PASS) we can skip the analyzer
		# update part.fwapos and part.gap
		if ( $buf->{endpos} > $p->{fwapos} ) {
		    if ( $p->{lptype} == IMP_PREPASS ) {
			# pass immediately, but also send to analyzer
			my $data = $buf->{data};
			my $keep = $buf->{endpos}-$p->{fwapos};
			if ( $keep >= length($data)) {
			    # forward all in buf
			} else {
			    # we have already send parts of buf to the analyzer
			    # forward only the rest
			    substr($data,-$keep,$keep,'');
			}
			# call analyzer for current part
			# if there was a gap before we need to send the current
			# offset
			$imp[$pi]->data(
			    $dir,
			    $data,
			    $p->{gap} ? $buf->{endpos}:0,
			    $buf->{dtype},
			);
		    } else {
			# pass w/o analyzer
			# this causes a gap in the stream to the analyzer
			$p->{gap} = 1;
		    }

		    # update part.fwapos with end of the forwarded buf
		    $p->{fwapos} = $buf->{endpos};
		}

		# propagate eof to analyzer
		# for lptype of IMP_PASS this needs to be done only if lppos !=
		# IMP_MAXOFFSET, otherwise the analyzer was not interested in
		# the end of data at all
		if ( $buf->{eof} ) {
		    if ( $p->{lptype} != IMP_PASS or $p->{lppos} != IMP_MAXOFFSET ) {
			$imp[$pi]->data(
			    $dir,
			    '', 
			    $p->{gap} ? $p->{fwapos}:0,
			    $buf->{dtype},
			)
		    }
		}

		# now add to @fwd so it gets transferred to next part
		push @fwd,$buf;
		$DEBUG && debug("process[$dir][$pi] fwd %d bytes with %s,".
		    "badjust=%d",
		    length($buf->{data}),$buf->{rtype},$buf->{badjust});
	    }

	    if ( ! @fwd and @$bufs == 1
		and $bufs->[-1]{data} eq ''
		and $bufs->[-1]{eof} ) {
		# This is the special case, where we did not forward anything
		# but have a single buf in the part which contains no data, but
		# only eof.
		# In this case add this buffer to @fwd so that the eof gets
		# transferred to the next part
		push @fwd, shift(@$bufs)
	    }

	    # if no more bufs are in the part add an empty one so that new data
	    # can get added to it. Does not make much sense if we have eof but
	    # makes code easier
	    if ( ! @$bufs ) {
		push @$bufs, Net::IMP::Cascade::_Buf->new(
		    endpos   => $fwd[-1]{endpos},
		    # cummulated adjustments needs to be copied from the last
		    # buf we have forwarded
		    padjust  => $fwd[-1]{padjust},
		    gpadjust => $fwd[-1]{gpadjust},
		)
	    }
	}

	#debug("$pi ".Dumper([$bufs,'------',\@fwd]));

	# transfer data to next part or propagate to caller
	for my $fw (@fwd) {

	    # skip fw with no useful information, e.g.
	    # no data, no eof or no adjustments (adjustments needs to be
	    # propagated even if we have no data, e.g. it might be that all
	    # data in the chunk got replaced with '')
	    if ( $fw->{data} ne ''   # we have data to transfer
		or $fw->{eof}        # or eof needs to be propagated
		or $fw->{badjust}    # or adjustment must be propagated
	    ) {
		# fine, we have useful information
	    } else {
		$DEBUG && debug("ignoring ".Dumper($fw));
		next;
	    }

	    # check if $pi is the last part in the cascade
	    # for dir 0 this will be $pi = $#imp, while for the opposite dir
	    # this will be 0
	    if ( $pi == ($dir ? 0:$#imp) ) {
		# propagate result up

		if ( ! $fw->{rtype} ) {
		    # Type 0 should only be in a buffer, which got not analyzed
		    # or did not got passed because of IMP_(PRE)PASS.
		    # In this step in processing we should not have such
		    # type anymore.
		    die "untyped buffer at end of cascade";
		}

		# determine offset for propagation:
		# fw.endpos is the position relativ to input of the last part.
		# For propagating to the upper layer we need the matching
		# position relativ to the the original input.
		# We get this by applying all previously added adjustments
		# back, but ignore adjustments from this part (adjustments are
		# relevant to next part, but here we propagate up instead of
		# transfering to next part).
		my $eob                 # adjusted end of buffer is:
		    = $fw->{endpos}     # endpos relativ to this part
		    - $fw->{gpadjust}   # minus all adjustments so far
		    + $fw->{padjust};   # but ignoring adjustments in this part
		#debug( "up $fw->{rtype} ".
		#   "eob($eob)=$fw->{endpos}-$fw->{gpadjust}".
		#   "+$fw->{padjust}\n".$_dump_part->($dir));

		$DEBUG && debug("process[$dir][$pi] -> cb($fw->{rtype},$dir,".
		    "$eob=$fw->{endpos}-adjust");

		if ( $eob < $global_lastpass[$dir]{pos} 
		    or $global_lastpass[$dir]{pos} == IMP_MAXOFFSET ) {
		    # we already issued an IMP_(PRE)PASS for this offset
		    # no need to propagate
		} elsif ( $fw->{rtype} ~~ [ IMP_PASS, IMP_PREPASS ]) {
		    # propagate IMP_(PRE)PASS
		    $wself->run_callback([$fw->{rtype},$dir,$eob]);
		} elsif ( $fw->{rtype} == IMP_REPLACE ) {
		    # propagate IMP_REPLACE
		    $wself->run_callback([$fw->{rtype},$dir,$eob,$fw->{data}]);
		} else {
		    # should not happen if this code is correct
		    die "cannot handle type $fw->{rtype}"
		}

	    } else {
		# we are not at the last part of the cascade
		# transfer data into next part of cascade

		# to determine the start position in the input stream for the
		# next part we need adjust the end position in the buf by any
		# adjustments so far in this part, then remove the length of
		# the data
		my $start =                # start position is:
		    $fw->{endpos}          # end position
		    + $fw->{padjust}       # skip all data we removed
		    - length($fw->{data}); # minus length of current data

		# call $_dataf with next part
		# index of next part depends on $dir, e.g. if we go up or down
		my $nextpi = $dir ? $pi-1:$pi+1;
		$DEBUG && debug("process[$dir][$pi] -> ".
		    "dataf(%d,pos=%d..",$nextpi,$start);
		$_dataf->(
		    $nextpi,
		    $dir,
		    $fw->{data},
		    $start,
		    $fw->{dtype},
		    $fw->{rtype},
		    $fw->{eof},
		    $fw->{gbadjust},
		    $fw->{gpadjust},
		);
	    }
	}

	# if we have data in this part which were not send to the analyzer,
	# send them now
	# this includes sending eof to the analyzer

	my $endpos = $bufs->[-1]{endpos};
	if ($endpos >= $p->{fwapos} ) {
	    $DEBUG && debug("process[$dir][$pi] -> endpos=$endpos ".
		"p.fwapos=$p->{fwapos}\n".$_dump_part->($dir));
	    for my $buf (@$bufs) {

		# $needa bytes need to be analyzed
		my $needa = $buf->{endpos} - $p->{fwapos};
		if ( $needa>0 ) {
		    # some real data to analyze

		    my $ld = length($buf->{data});
		    if ( $needa>$ld ) {
			# send everything, but we have a gap of size $needa-$ld
			$DEBUG && debug("process[$dir][$pi] ".
			    "-> data(%d,allbuf<%d>,%d,%s)",
			    $dir,$ld,$buf->{endpos},$buf->{dtype});
			$imp[$pi]->data(
			    $dir,
			    $buf->{data},
			    $buf->{endpos}-$ld,
			    $buf->{dtype},
			);
		    } else {
			# the last $needa from buf needs to be analyzed (we
			# skip optimization when $needa == $ld)
			$DEBUG && debug("process[$dir][$pi] ".
			    "-> data(%d,buf<%d,%d>,%s,%s)",
			    $dir,
			    $needa-$ld, $buf->{endpos},
			    $p->{gap} ? $buf->{endpos}-$ld :'',
			    $buf->{dtype},
			);
			$imp[$pi]->data(
			    $dir,
			    substr($buf->{data},-$needa,$needa),
			    $p->{gap} ? $buf->{endpos}-$ld:0,
			    $buf->{dtype},
			);
		    }
		    $p->{fwapos} = $buf->{endpos};
		    $p->{gap} = 0;
		}
	    }

	    # If the last buf contains eof, we need to send this to the
	    # analyzer too, except if we got a free ride with IMP_PASS of
	    # IMP_MAXOFFSET.
	    if ( $bufs->[-1]{eof} ) {
		if ( $p->{lptype} != IMP_PASS or $p->{lppos} != IMP_MAXOFFSET ) {
		    $imp[$pi]->data(
			$dir,
			'', 
			$p->{gap} ? $p->{fwapos}:0,
			$bufs->[-1]{dtype}
		    )
		}
	    }
	}
    };

    # This is the callback for the analyzer, e.g. analyzer $imp[$pi] from part
    # $pi calls $_imp_cb->($pi,@results)
    my $_imp_cb = sub {
	my $pi = shift;

	# track if something changed for dir, so that we know if we need to
	# recompute global_lastpass
	my %dir_changed;

	while ( my $rv = shift(@_)) {
	    my $rtype = shift(@$rv);

	    if ( $rtype ~~ [ IMP_DENY,IMP_DROP,IMP_ACCTFIELD ]) {
		# these gets propagated directly up w/o changes
		$DEBUG && debug("impcb[*][$pi] $rtype @$rv");
		$wself->run_callback([$rtype,@$rv]);

	    } elsif ( $rtype == IMP_LOG ) {
		# these gets also propagated directly up
		$DEBUG && debug("impcb[*][$pi] $rtype @$rv");

		# but we need to adjust the offset before so that it reflects
		# the offset in the original input stream
		my ($dir,$offset,$len,$level,$msg) = @$rv;
		my $buf = $parts[$dir][$pi]{bufs}[-1];
		$offset +=                    # adjust by
		    $buf->{gpadjust}          # all adjustments so far
		    - $buf->{padjust};        # but not from this part
		$wself->run_callback([$rtype,$dir,$offset,$len,$level,$msg]);

	    } elsif ( $rtype ~~ [ IMP_PASS,IMP_PREPASS ]) {
		my ($dir,$offset) = @$rv;
		#debug("impcb[$dir][$pi] $rtype off=$offset\n"
		#    .$_dump_part->($dir));

		my $p = $parts[$dir][$pi];
		my $startpos = $p->{bufs}[0]{endpos}
		    - length($p->{bufs}[0]{data});

		if ( $p->{lppos} and (
		    $p->{lppos} == IMP_MAXOFFSET or
		    ($offset != IMP_MAXOFFSET and $offset < $p->{lppos})
		    )) {
		    # ignore because we got an IMP_(PRE)PASS with a higher
		    # offset before 
		    $DEBUG && debug("impcb[$dir] $rtype ignoring, ".
			"offset($offset)<lppos($p->{lppos})");

		} elsif ( $offset <= $startpos and $offset != IMP_MAXOFFSET ) {
		    # ignore because we got an IMP_(PRE)PASS for data we
		    # already processed 
		    $DEBUG && debug("impcb[$dir][$pi] $rtype ignoring, ".
			"offset($offset)<pos($startpos)");

		} else {
		    # set lppos and lptype from result and call process, to
		    # see, if we could forward some more data
		    $DEBUG && debug("impcb[$dir][$pi] $rtype off=$offset, ".
			"lppos: $p->{lppos} -> $offset");

		    $p->{lppos}  = $offset;
		    $p->{lptype} = $rtype;
		    $dir_changed{$dir}++;

		    $process->($pi,$dir);
		}

	    } elsif ( $rtype == IMP_REPLACE ) {
		my ($dir,$offset,$newdata) = @$rv;
		$DEBUG && debug("impcb[%d][%d] %s off=%d '%s'\n%s",
		    $dir,$pi,$rtype,$offset,$newdata,$_dump_part->($dir));
		my $p = $parts[$dir][$pi];
		my $startpos = $p->{bufs}[0]{endpos}
		    - length($p->{bufs}[0]{data});

		if ( $offset == IMP_MAXOFFSET
		    or $offset > $p->{bufs}[-1]{endpos} ) {
		    # we cannot replace future data
		    die "cannot replace future data, endpos=".
			$p->{bufs}[-1]{endpos}." offset=$offset";

		} elsif ( $p->{lppos} and 
		    ( $p->{lppos} == IMP_MAXOFFSET or $offset < $p->{lppos} )) {
		    # We got a replacement for data which earlier received an
		    # IMP_(PRE)PASS. This should never happen (but can, if the
		    # analyzer is bogus).
		    die "cannot replace \@$offset because of ".
			"$p->{lptype} \@$p->{lppos}";

		} elsif ( $offset <= $startpos ) {
		    # We got a replacement for data, we already handled.
		    # This should never happen (but can, if the analyzer is
		    # bogus).
		    die "cannot replace already processed data";

		} elsif ( $p->{partial_pass} ) {
		    die "cannot replace part of packet (after partially passed data)";

		} else {
		    # The replacement consists of two pieces:
		    # - remove all data which should be replaced from bufs
		    # - call process with the data which should be used instead

		    # first remove everything in bufs up to offset with newdata

		    # badjust is the adjustment done in the buffer, e.g. how
		    # much bytes got added (or removed if badjust<0)
		    my $badjust = length($newdata) - ($offset - $startpos);
		    $DEBUG && debug("impcb[%d][%d] %s %d '%s' badjust=%d ",
			$dir,$pi,$rtype,$offset,$newdata,$badjust);

		    # gbadjust is the sum of all existing gbadjust from the
		    # buffers we remove
		    my $gbadjust = 0;

		    my $bufs = $p->{bufs};
		    my $dtype;
		    while (1) {
			my $buf = $bufs->[0];
			$dtype = $buf->{dtype};
			if ( $buf->{endpos} <= $offset ) {
			    $DEBUG && debug("remove whole buffer (%s/%d)",
				$buf->{rtype},$buf->{endpos});

			    # remove whole buffer
			    shift(@$bufs);

			    # add any gbadjust we would remove to the new val
			    # we don't need to do the same with badjust because
			    # badjust is 0 for all unprocessed buffers in a
			    # part
			    $gbadjust += $buf->{gbadjust};

			    # add empty buf and exit loop in case all bufs are
			    # eaten
			    if ( !@$bufs ) {
				push @$bufs, Net::IMP::Cascade::_Buf->new(
				    endpos   => $offset,
				    padjust  => $buf->{padjust},
				    gpadjust => $buf->{gpadjust},
				);
				last;
			    }

			} elsif ( $buf->{endpos} - $offset
			    < length($buf->{data})) {
			    $DEBUG && debug("remove part of buffer (%s/%d)",
				$buf->{rtype},$buf->{endpos});

			    # Remove only first part of buffer up to $offset.
			    # Partial replace is only available for stream data
			    if ( $buf->{dtype} >0 ) {
				croak( sprintf("cannot replace part of buffer %s (%s/%d)",
				    $buf->{dtype},$buf->{rtype},$buf->{endpos}));
			    }

			    # No need to add to gbadjust because adjustments
			    # are considered at the end of the buffer and we
			    # keep the end.

			    # keep last ($buf->{endpos}- $offset) bytes in buf
			    $buf->{data} = substr($buf->{data},
				$offset-$buf->{endpos});
			    last;

			} else {
			    # nothing to remove
			    last;
			}
		    }

		    if ( $badjust ) {
			$gbadjust += $badjust;

			# Each byte can only be in one buffer, because
			# processed data will be immediately removed from the
			# current part and forwarded to the next part.
			# We need to propagate the adjustment to all data
			# which were not processed yet, e.g. data in @$bufs, in
			# previous parts and future data (this is done by using
			# gpadjust in  _dataf).
			for (@$bufs) {
			    $_->{padjust}  += $badjust;
			    $_->{gpadjust} += $badjust;
			}
			for my $i ( $dir ? ( $pi+1..$#imp ) : ( 0..$pi-1 )) {
			    for (@{$parts[$dir][$i]{bufs}}) {
				$_->{gpadjust} += $badjust;
			    }
			}
		    }

		    # process the new data, forward the replacements
		    $process->($pi,$dir, Net::IMP::Cascade::_Buf->new(
			data      => $newdata,
			endpos    => $offset,
			dtype     => $dtype,
			rtype     => IMP_REPLACE,
			badjust   => $badjust,
			gbadjust  => $gbadjust,
			padjust   => $bufs->[0]{padjust},
			gpadjust  => $bufs->[0]{gpadjust},
		    ));
		    $dir_changed{$dir}++;
		}

	    } else {
		die "unsupported type $rtype";
	    }
	}

	# if something changed, check if we could update global_lastpass
	for my $dir (keys %dir_changed) {

	    next if $global_lastpass[$dir]{pos} == IMP_MAXOFFSET;

	    # We traverse all parts for $dir and check the amount of data we
	    # could forward in each part (e.g. check lppos vs. endpos of last
	    # buf in part). The minimum of all these values (lpdiff) is the
	    # number of bytes we could (pre)pass over all parts.
	    # Additionally we need to find the lptype with the most importance
	    # which should be used for this data.

	    my $lpdiff;
	    my $lptype = 0;
	    my $px = $parts[$dir];
	    for my $p (@$px) {
		if ( $p->{lppos} == IMP_MAXOFFSET ) {
		    $lpdiff = IMP_MAXOFFSET;
		    $lptype = $p->{lptype} if $p->{lptype} > $lptype;

		} else {
		    my $over = $p->{lppos} - $p->{bufs}[-1]{endpos};
		    if ( $over <= 0 ) {
			# nothing can be forwarded
			$lpdiff = 0;
			last
		    } elsif ( ! $lpdiff || $lpdiff>$over ) {
			# lpdiff can be forwarded with type lptype
			$lpdiff = $over;
			$lptype = $p->{lptype} if $p->{lptype} > $lptype;
		    }
		}
	    }
	    $lptype or next;
	    my $pos;
	    if ( $lpdiff == IMP_MAXOFFSET ) {
		$pos = IMP_MAXOFFSET;
	    } elsif ( $lpdiff>0 ) {
		$pos = $px->[-1]{bufs}[-1]{endpos} + $lpdiff;
		next if $pos <= $global_lastpass[$dir]{pos};
	    } else {
		next;
	    }

	    # we got a higher value for global_lastpass
	    # update and propagate it up
	    $global_lastpass[$dir] = {
		pos  => $pos,
		type => $lptype
	    };
	    $wself->run_callback([$lptype,$dir,$pos]);
	}
    };

    # While we are in $dataf function we will only spool callbacks and process
    # them at the end. Otherwise $dataf might cause call of callback which then
    # causes call of dataf etc - which makes debugging a nightmare.

    my $collect_callbacks;
    my $dataf = sub {
	$collect_callbacks ||= [];
	$_dataf->(@_);
	while ( my $cb = shift(@$collect_callbacks)) {
	    $_imp_cb->(@$cb);
	}
	$collect_callbacks = undef
    };

    # wrapper which spools callbacks if within dataf
    my $imp_cb = sub {
	if ( $collect_callbacks ) {
	    # only spool and execute later
	    push @$collect_callbacks, [ @_ ];
	    return;
	}
	return $_imp_cb->(@_)
    };

    # setup callbacks
    $imp[$_]->set_callback( $imp_cb,$_ ) for (0..$#imp);

    # make some closures available within methods
    $self->{dataf} = $dataf;
    $self->{closef} = sub {
	$dataf = $process = $imp_cb = undef;
	@parts = ();
    };
    return $self;
}

sub data {
    my ($self,$dir,$data,$offset,$dtype) = @_;
    $self->{dataf}(
	$dir ? -1:0, # input part
	$dir,
	$data,
	$offset,     # start of $data in input stream
	$dtype // IMP_DATA_STREAM,  # data type
	0,           # result type for this data from previous entry in cascade
	$data eq '', # eof
	0,           # gbadjust
	0,           # gpadjust
    );
}

sub DESTROY {
    my $closef = shift->{closef};
    $closef->() if $closef;
}

# This package just wraps each buffer in parts[dir][pi]{bufs}.
# The fields got described at the beginning of new_analyzer.

package Net::IMP::Cascade::_Buf;
use fields qw(data endpos dtype rtype badjust padjust gpadjust gbadjust eof);
sub new {
    my ($class,%args) = @_;
    my $self = fields::new($class);
    %$self = %args;
    $self->{data} //= '';
    $self->{$_} //= 0 for
	(qw(endpos rtype badjust padjust gpadjust gbadjust eof));
    return $self;
}

1;

__END__

=head1 NAME

Net::IMP::Cascade - manages cascade of IMP filters

=head1 SYNOPSIS

    use Net::IMP::Cascade;
    use Net::IMP::Pattern;
    use Net::IMP::SessionLog;
    ...
    my $imp = Net::IMP::Cascade->new_factory( parts => [
	Net::IMP::Pattern->new_factory..,
	Net::IMP::SessionLog->new_factory..,
    ]);

=head1 DESCRIPTION

C<Net::IMP::Cascade> puts multiple IMP analyzers into a cascade.
Data get analyzed starting with part#0, then part#1... etc for direction 0
(client to server), while for direction 1 (server to client) the data get
analyzed the opposite way, ending in part#0.

The only argument special to C<new_factory> is C<parts>, which is an array
reference of L<Net::IMP> factory objects.
When C<new_analyzer> gets called on the L<Net::IMP::Cascade>> factory,
C<new_analyzer> will be called on the factory objects of the parts too, keeping
all arguments, except C<parts>.

=head1 TODO

Currently IMP_TOSENDER is not supported

=head1 BUGS

Don't know of any, but the feature and thus the code is way more complex than I
originally hoped :(

=head1 AUTHOR

Steffen Ullrich <sullr@cpan.org>

=head1 COPYRIGHT

Copyright by Steffen Ullrich.

This module is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.