use strict;
use warnings;
package Net::IMP::Cascade;
use base 'Net::IMP::Base';
use fields (
'parts', # analyzer objects
# we do everything with closures inside new_analyzer here, so that the
# object has only fields for accessing some closures from subs
'dataf', # called from sub data
'closef', # called from DESTROY
);
use Net::IMP; # constants
use Carp 'croak';
use Scalar::Util 'weaken';
use Hash::Util 'lock_keys';
use Net::IMP::Debug;
use Data::Dumper;
my %rtypes_implemented_myself = map { $_ => 1 } (
IMP_PASS,
IMP_PREPASS,
IMP_REPLACE,
IMP_DENY,
IMP_DROP,
#IMP_TOSENDER, # not supported yet
IMP_LOG,
IMP_ACCTFIELD,
);
sub interface {
my Net::IMP::Cascade $factory = shift;
my $parts = $factory->{factory_args}{parts};
# collect interfaces by part
my @if4part;
for my $p ( @$parts ) {
my @if;
for my $if ( $p->interface(@_)) {
# $if should require only return types I support
push @if,$if
if ! grep { ! $rtypes_implemented_myself{$_} } @{ $if->[1] };
}
@if or return; # nothing in common
push @if4part,\@if
}
# find interfaces which are supported by all parts
my @common;
for( my $i=0;$i<@if4part;$i++ ) {
for my $if_i ( @{ $if4part[$i] } ) {
my ($in_i,$out_i) = @$if_i;
# check if $if_i matches at least on interface description in
# all other parts, e.g. $if_i is same or included in $if_k
# - data type/proto: $in_k should be undef or same as $in_i
# - return types: $out_i should include $out_k
for( my $k=0;$k<@if4part;$k++ ) {
next if $i == $k; # same
for my $if_k ( @{ $if4part[$k] } ) {
my ($in_k,$out_k) = @$if_k;
# should be same data type or $in_k undef
next if $in_k and ( ! $in_i or $in_k != $in_i );
# $out_i should include all of $out_k
my %out_k = map { $_ => 1 } @$out_k;
delete @out_k{ @$out_i };
next if %out_k; # some in k are not in i
# junction if i and k
push @common,[ $in_k,$out_i ];
}
}
}
}
# remove duplicates from match
my (@uniq,%m);
for( @common ) {
my $key = ( $_->[0] // '<undef>' )."\0".join("\0",sort @{$_->[1]});
push @uniq,$key if ! $m{$key}++;
}
return @uniq;
}
sub new_analyzer {
my ($factory,%args) = @_;
my $p = $factory->{factory_args}{parts};
my $self = $factory->SUPER::new_analyzer(%args);
my @imp = map { $_->new_analyzer(%args) } @$p;
$self->{parts} = \@imp;
# $parts[$dir][$pi] is the part for direction $dir, analyzer $pi
my @parts;
# each entry in parts consists of
# bufs - list of data buffers together with their state of processing
# each entry in bufs consists of a hash with
# - data: the data
# - dtype: the type of data, e.g. IMP_DATA_STREAM, IMP_DATA_PACKET...
# - endpos: position of end of data, relativ to input stream of part
# - rtype: result type, which caused this buffer, eg IMP_PASS,
# IMP_REPLACE... initially 0 (e.g. no type)
# - eof: true if last buf in stream
# because replacements might add/delete bytes we need to track these
# adjustments for a chunk thru all parts. Unfortunatly this is fairly
# complex. Example: if we replace 10 bytes with 3 bytes the local
# adjustment will be -7. This adjustment will affect the adjustments
# of all following data and will effect the final adjustment of this
# data chunk in the cascade. But adjustments from later data should
# not effect the adjustments for the current data..
# - badjust: adjustment done in this [b]uffer, e.g. only caused by
# replacements of data in this buffer in the current part
# - gbadjust: like badjust, but accumulated over all parts ([g]lobal])
# - padjust: accumulated adjustments relativ to input into this [p]art,
# e.g. sum of badjust of this and all previous buf in this part
# - gpadjust: like padjust, but over all parts ([g]lobal)
# fwapos - up to which position related to input stream data got
# forwarded to analyzer (or passed w/o needing analyzer)
# meaning: [pos]ition up to which it got [f]or[w]arded to [a]nalyzer
# gap - flag, set if we recently skipped data due to IMP_PASS,
# reset, when we send data to the analyzer.
# If set the data will be forwarded to the analyzer with offset given
# lppos - offset from [l]ast [p]ass|pre[p]ass reply (related to input
# stream)
# lptype - type of reply which updated lppos (pass|prepass)
# partial_pass - flag if we got a (pre)pass for a part of the last
# buffer, in case we have a non-streaming data type.
# partial pass will be ignored if followed by another (pre)pass, but
# it cannot be followed by a replacement (only whole packets can be
# replaced)
# initialize @parts
for( my $i=0;$i<@imp;$i++ ) {
# data from client
$parts[0][$i] = my $h = {
bufs => [ Net::IMP::Cascade::_Buf->new ],
fwapos => 0,
gap => 0,
lppos => 0,
lptype => IMP_PASS,
partial_pass => 0,
};
lock_keys(%$h);
# data from server get processed in the other direction
# [CLIENT] --> 00 -> 11 -> .. [SERVER] .. --> 11 -> 00 ->
$parts[1][$i] = $h = {
bufs => [ Net::IMP::Cascade::_Buf->new ],
fwapos => 0,
gap => 0,
lppos => 0,
lptype => IMP_PASS,
partial_pass => 0,
};
lock_keys(%$h);
}
# global lastpass
# if all analyzers in cascade issue a pass/prepass into the future
# we can propagate the minimum offset into future early
# $global_lastpass[$dir] -> [$lppos,$lptype]
my @global_lastpass = (
{ pos => 0, type => 0 },
{ pos => 0, type => 0 }
);
# to make sure we don't leak due to cross-references
weaken( my $wself = $self );
# returns dump of parts for direction incl. bufs, only used for debugging
my $_dump_part = sub {
my $dir = shift;
my $t = '';
for my $i ( @_ ? @_ : (0..$#imp)) {
my $p = $parts[$dir][$i];
my $bufs = $p->{bufs};
$t.= sprintf("[%d] $imp[$i] fwa(%d) lp=%s(%d) gap(%d)\n",
$i,$p->{fwapos},$p->{lptype},$p->{lppos},$p->{gap});
for my $buf (@$bufs) {
$t .= sprintf(" - %s %s(%s) Badj(%d) Padj(%d) Gadj(%d) %s'%s'\n",
$buf->{dtype} // '<undef>',
$buf->{rtype},
length($buf->{data})
? ($buf->{endpos}-length($buf->{data})+1)
."..$buf->{endpos}"
: $buf->{endpos},
$buf->{badjust},
$buf->{padjust},
$buf->{gpadjust},
$buf->{eof} ? '<eof> ' :'',
$buf->{data}
);
}
}
return $t;
};
# the data function
# called from sub data on new data and from $process when data are finished
# in on part and should be transferred into the next part
# $pi - index into parts
# $dir - direction (e.g. target part is $parts[$dir][$pi])
# $data - the data
# $pos - offset of $data relativ to input into current part $pi
# $dtype - data type of data (stream, packet...)
# $rtype - if called from previous part we get the result type of the data,
# e.g. if they are result of replacement, pass....
# this needs to be propagated thru the parts and will be used in the
# final result type
# $eof - are $data the last data in this direction?
# $gbadjust, $gpadjust - fields from buf which was done in previous part
# and got transferred into this part. Needed to accumulate adjustments
# over all parts.
my $process;
my $_dataf = sub {
my ($pi,$dir,$data,$pos,$dtype,$rtype,$eof,$gbadjust,$gpadjust) = @_;
$pi = @imp+$pi if $pi<0;
$DEBUG && debug("dataf[$dir][$pi] len=".length($data)." "
.($pos ? "pos=$pos":'<nooff>' )." "
.( $rtype||'' )
." $dtype adj=$gbadjust/$gpadjust eof=$eof\n"
.$_dump_part->($dir,$pi)
);
my $p = $parts[$dir][$pi];
my $bufs = $p->{bufs};
my $endpos = $bufs->[-1]{endpos};
# add data to buf:
# if there is no gap and rtype of buf matches and no adjustments are
# used and dtype is stream type, then we can add data to an
# existing buf, otherwise we need to create a new one
# data from buffers with adjustments can never be merged, because
# adjustments are considered beeing at the end of the buf, not
# somewhere in the middle
if ( $pos and $endpos > $pos ) {
die "overlapping data ($pos,$endpos)"
} elsif (
( ! $pos or $endpos == $pos ) # no gap
# caused by same result type
and ( ! $bufs->[-1]{rtype} or ($rtype||0) == $bufs->[-1]{rtype} )
and ! $gbadjust and ! $bufs->[-1]{gbadjust} # no adjustments
and ( # same streaming data type
! defined $bufs->[-1]{dtype} or
$dtype <0 and $dtype == $bufs->[-1]{dtype} )
) {
# append
$endpos += length($data);
$bufs->[-1]{data} .= $data;
$bufs->[-1]{endpos} = $endpos;
$bufs->[-1]{eof} = $eof;
$bufs->[-1]{rtype} ||= $rtype||0;
$bufs->[-1]{dtype} = $dtype;
} else {
# gap, different type or adjustments involved
$endpos = $pos + length($data);
push @$bufs, Net::IMP::Cascade::_Buf->new(
data => $data,
endpos => $endpos,
dtype => $dtype||0,
rtype => $rtype||0,
eof => $eof,
badjust => 0,
gbadjust => $gbadjust,
# padjust of previous buf is accumulated badjust of all
# previous bufs together
padjust => $bufs->[-1]{padjust},
gpadjust => $gpadjust,
);
}
# if a new buffer was created we can now process the buffers
$process->($pi,$dir);
};
# This is the central part. It processes buffers for $dir in part $pi.
# It gets called from $_dataf or $_imp_cb whenever:
# - new data arrived thru $_dataf from outside or previous part
# -> send to analyzer if necessary
# - xor lppos increased due to IMP_(PRE)PASS callback from analyzer
# -> check if buffered data can be forwarded to next part
# - xor replacements got added due to callback from analyzer, only in this
# case @fwd will be given and contains buffers, which should be
# propagated to next part
# -> forward replaced buffers
$process = sub {
my ($pi,$dir,@fwd) = @_;
my $p = $parts[$dir][$pi];
my $bufs = $p->{bufs};
# @fwd contains buffers to forward into next part
# this is only already set if called from callback on replacement,
# otherwise we will try to fill it based on bufs in current part
if ( ! @fwd ) {
# new data added or lppos changed
# check if we have data in bufs which might be added to @fwd
my $endpos = $bufs->[-1]{endpos};
die "endpos mismatch" if $endpos < $p->{fwapos};
#debug( "bufs[$dir][$pi] fwapos=$p->{fwapos}, endpos=$endpos,".
# " lppos=$p->{lppos} -- " .Dumper($bufs));
# up to offset=lppos we can forward w/o sending it to the analyzer
# (on IMP_PASS) or send it to the analyzer after forwarding it
# (IMP_PREPASS)
while ( my $buf = shift(@$bufs) ) {
my $keep = ( $p->{lppos} == IMP_MAXOFFSET)
? -1 : $buf->{endpos} - $p->{lppos};
if ( $keep <=0 ) {
$DEBUG && debug("fwd complete buf lppos=%d endpos=%d",
$p->{lppos}, $buf->{endpos});
# we don't need to keep anything in the buf, e.g. fwd
# complete buf
# we might need to adjust the types, e.g. if lptype is
# IMP_PASS but buf.type is IMP_PREPASS this will result in
# IMP_PREPASS. The types are sorted in Net::IMP by
# importance so we can just use the largest
$buf->{rtype} = $p->{lptype} if $p->{lptype}>$buf->{rtype};
$p->{partial_pass} = 0;
} elsif ( $keep < length($buf->{data}) ) {
# we can forward parts of buf only
# split $buf, $keep bytes will be kept in $bufs
$DEBUG && debug("fwd part(buf) lppos=%d endpos=%d keep=%d",
$p->{lppos}, $buf->{endpos}, $keep);
if ( $buf->{dtype} >0 ) {
# only streaming data might be split into arbitrary
# chunks, others can only be handled as whole packets
# thus just ignore this (pre)pass and hope we will get
# one with extends to the whole packet
$DEBUG && debug( sprintf(
"ignoring partial (pre)pass for %s lppos=%d endpos=%d keep=%d",
$buf->{dtype},$p->{lppos}, $buf->{endpos}, $keep));
# note the partial pass to croak if followed by replacement
$p->{partial_pass} = 1;
next;
}
my $data = substr($buf->{data},-$keep,$keep,'');
# put buf with rest of data back into @$bufs
# adjustments will stay in the rest, because they are
# considered beeing at the end of the buf
unshift @$bufs, Net::IMP::Cascade::_Buf->new(
data => $data,
endpos => $buf->{endpos},
badjust => $buf->{badjust},
gbadjust => $buf->{gbadjust},
padjust => $buf->{padjust},
gpadjust => $buf->{gpadjust},
eof => $buf->{eof},
dtype => $buf->{dtype},
rtype => $buf->{rtype},
);
# adjust endpos of buf we forward
$buf->{endpos} -= $keep;
# adjust type: the more important wins
$buf->{rtype} = $p->{lptype} if $p->{lptype}>$buf->{rtype};
# adjustments and eof will stay with rest of data in @$bufs
# padjust and gpadjust needs to be fixed to not contain any
# adjustments which stay in the rest
$buf->{padjust} -= $buf->{badjust};
$buf->{gpadjust} -= $buf->{gbadjust};
$buf->{badjust} = 0;
$buf->{gbadjust} = 0;
$buf->{eof} = 0;
} else {
# there is nothing we can forward, but buf back into @$bufs
# and stop processing @$bufs
unshift @$bufs,$buf;
last;
}
# we have some $buf to forward because lppos allowed us so
# if lptype is IMP_PREPASS we need to send it to analyzer too,
# otherwise (IMP_PASS) we can skip the analyzer
# update part.fwapos and part.gap
if ( $buf->{endpos} > $p->{fwapos} ) {
if ( $p->{lptype} == IMP_PREPASS ) {
# pass immediately, but also send to analyzer
my $data = $buf->{data};
my $keep = $buf->{endpos}-$p->{fwapos};
if ( $keep >= length($data)) {
# forward all in buf
} else {
# we have already send parts of buf to the analyzer
# forward only the rest
substr($data,-$keep,$keep,'');
}
# call analyzer for current part
# if there was a gap before we need to send the current
# offset
$imp[$pi]->data(
$dir,
$data,
$p->{gap} ? $buf->{endpos}:0,
$buf->{dtype},
);
} else {
# pass w/o analyzer
# this causes a gap in the stream to the analyzer
$p->{gap} = 1;
}
# update part.fwapos with end of the forwarded buf
$p->{fwapos} = $buf->{endpos};
}
# propagate eof to analyzer
# for lptype of IMP_PASS this needs to be done only if lppos !=
# IMP_MAXOFFSET, otherwise the analyzer was not interested in
# the end of data at all
if ( $buf->{eof} ) {
if ( $p->{lptype} != IMP_PASS or $p->{lppos} != IMP_MAXOFFSET ) {
$imp[$pi]->data(
$dir,
'',
$p->{gap} ? $p->{fwapos}:0,
$buf->{dtype},
)
}
}
# now add to @fwd so it gets transferred to next part
push @fwd,$buf;
$DEBUG && debug("process[$dir][$pi] fwd %d bytes with %s,".
"badjust=%d",
length($buf->{data}),$buf->{rtype},$buf->{badjust});
}
if ( ! @fwd and @$bufs == 1
and $bufs->[-1]{data} eq ''
and $bufs->[-1]{eof} ) {
# This is the special case, where we did not forward anything
# but have a single buf in the part which contains no data, but
# only eof.
# In this case add this buffer to @fwd so that the eof gets
# transferred to the next part
push @fwd, shift(@$bufs)
}
# if no more bufs are in the part add an empty one so that new data
# can get added to it. Does not make much sense if we have eof but
# makes code easier
if ( ! @$bufs ) {
push @$bufs, Net::IMP::Cascade::_Buf->new(
endpos => $fwd[-1]{endpos},
# cummulated adjustments needs to be copied from the last
# buf we have forwarded
padjust => $fwd[-1]{padjust},
gpadjust => $fwd[-1]{gpadjust},
)
}
}
#debug("$pi ".Dumper([$bufs,'------',\@fwd]));
# transfer data to next part or propagate to caller
for my $fw (@fwd) {
# skip fw with no useful information, e.g.
# no data, no eof or no adjustments (adjustments needs to be
# propagated even if we have no data, e.g. it might be that all
# data in the chunk got replaced with '')
if ( $fw->{data} ne '' # we have data to transfer
or $fw->{eof} # or eof needs to be propagated
or $fw->{badjust} # or adjustment must be propagated
) {
# fine, we have useful information
} else {
$DEBUG && debug("ignoring ".Dumper($fw));
next;
}
# check if $pi is the last part in the cascade
# for dir 0 this will be $pi = $#imp, while for the opposite dir
# this will be 0
if ( $pi == ($dir ? 0:$#imp) ) {
# propagate result up
if ( ! $fw->{rtype} ) {
# Type 0 should only be in a buffer, which got not analyzed
# or did not got passed because of IMP_(PRE)PASS.
# In this step in processing we should not have such
# type anymore.
die "untyped buffer at end of cascade";
}
# determine offset for propagation:
# fw.endpos is the position relativ to input of the last part.
# For propagating to the upper layer we need the matching
# position relativ to the the original input.
# We get this by applying all previously added adjustments
# back, but ignore adjustments from this part (adjustments are
# relevant to next part, but here we propagate up instead of
# transfering to next part).
my $eob # adjusted end of buffer is:
= $fw->{endpos} # endpos relativ to this part
- $fw->{gpadjust} # minus all adjustments so far
+ $fw->{padjust}; # but ignoring adjustments in this part
#debug( "up $fw->{rtype} ".
# "eob($eob)=$fw->{endpos}-$fw->{gpadjust}".
# "+$fw->{padjust}\n".$_dump_part->($dir));
$DEBUG && debug("process[$dir][$pi] -> cb($fw->{rtype},$dir,".
"$eob=$fw->{endpos}-adjust");
if ( $eob < $global_lastpass[$dir]{pos}
or $global_lastpass[$dir]{pos} == IMP_MAXOFFSET ) {
# we already issued an IMP_(PRE)PASS for this offset
# no need to propagate
} elsif ( $fw->{rtype} ~~ [ IMP_PASS, IMP_PREPASS ]) {
# propagate IMP_(PRE)PASS
$wself->run_callback([$fw->{rtype},$dir,$eob]);
} elsif ( $fw->{rtype} == IMP_REPLACE ) {
# propagate IMP_REPLACE
$wself->run_callback([$fw->{rtype},$dir,$eob,$fw->{data}]);
} else {
# should not happen if this code is correct
die "cannot handle type $fw->{rtype}"
}
} else {
# we are not at the last part of the cascade
# transfer data into next part of cascade
# to determine the start position in the input stream for the
# next part we need adjust the end position in the buf by any
# adjustments so far in this part, then remove the length of
# the data
my $start = # start position is:
$fw->{endpos} # end position
+ $fw->{padjust} # skip all data we removed
- length($fw->{data}); # minus length of current data
# call $_dataf with next part
# index of next part depends on $dir, e.g. if we go up or down
my $nextpi = $dir ? $pi-1:$pi+1;
$DEBUG && debug("process[$dir][$pi] -> ".
"dataf(%d,pos=%d..",$nextpi,$start);
$_dataf->(
$nextpi,
$dir,
$fw->{data},
$start,
$fw->{dtype},
$fw->{rtype},
$fw->{eof},
$fw->{gbadjust},
$fw->{gpadjust},
);
}
}
# if we have data in this part which were not send to the analyzer,
# send them now
# this includes sending eof to the analyzer
my $endpos = $bufs->[-1]{endpos};
if ($endpos >= $p->{fwapos} ) {
$DEBUG && debug("process[$dir][$pi] -> endpos=$endpos ".
"p.fwapos=$p->{fwapos}\n".$_dump_part->($dir));
for my $buf (@$bufs) {
# $needa bytes need to be analyzed
my $needa = $buf->{endpos} - $p->{fwapos};
if ( $needa>0 ) {
# some real data to analyze
my $ld = length($buf->{data});
if ( $needa>$ld ) {
# send everything, but we have a gap of size $needa-$ld
$DEBUG && debug("process[$dir][$pi] ".
"-> data(%d,allbuf<%d>,%d,%s)",
$dir,$ld,$buf->{endpos},$buf->{dtype});
$imp[$pi]->data(
$dir,
$buf->{data},
$buf->{endpos}-$ld,
$buf->{dtype},
);
} else {
# the last $needa from buf needs to be analyzed (we
# skip optimization when $needa == $ld)
$DEBUG && debug("process[$dir][$pi] ".
"-> data(%d,buf<%d,%d>,%s,%s)",
$dir,
$needa-$ld, $buf->{endpos},
$p->{gap} ? $buf->{endpos}-$ld :'',
$buf->{dtype},
);
$imp[$pi]->data(
$dir,
substr($buf->{data},-$needa,$needa),
$p->{gap} ? $buf->{endpos}-$ld:0,
$buf->{dtype},
);
}
$p->{fwapos} = $buf->{endpos};
$p->{gap} = 0;
}
}
# If the last buf contains eof, we need to send this to the
# analyzer too, except if we got a free ride with IMP_PASS of
# IMP_MAXOFFSET.
if ( $bufs->[-1]{eof} ) {
if ( $p->{lptype} != IMP_PASS or $p->{lppos} != IMP_MAXOFFSET ) {
$imp[$pi]->data(
$dir,
'',
$p->{gap} ? $p->{fwapos}:0,
$bufs->[-1]{dtype}
)
}
}
}
};
# This is the callback for the analyzer, e.g. analyzer $imp[$pi] from part
# $pi calls $_imp_cb->($pi,@results)
my $_imp_cb = sub {
my $pi = shift;
# track if something changed for dir, so that we know if we need to
# recompute global_lastpass
my %dir_changed;
while ( my $rv = shift(@_)) {
my $rtype = shift(@$rv);
if ( $rtype ~~ [ IMP_DENY,IMP_DROP,IMP_ACCTFIELD ]) {
# these gets propagated directly up w/o changes
$DEBUG && debug("impcb[*][$pi] $rtype @$rv");
$wself->run_callback([$rtype,@$rv]);
} elsif ( $rtype == IMP_LOG ) {
# these gets also propagated directly up
$DEBUG && debug("impcb[*][$pi] $rtype @$rv");
# but we need to adjust the offset before so that it reflects
# the offset in the original input stream
my ($dir,$offset,$len,$level,$msg) = @$rv;
my $buf = $parts[$dir][$pi]{bufs}[-1];
$offset += # adjust by
$buf->{gpadjust} # all adjustments so far
- $buf->{padjust}; # but not from this part
$wself->run_callback([$rtype,$dir,$offset,$len,$level,$msg]);
} elsif ( $rtype ~~ [ IMP_PASS,IMP_PREPASS ]) {
my ($dir,$offset) = @$rv;
#debug("impcb[$dir][$pi] $rtype off=$offset\n"
# .$_dump_part->($dir));
my $p = $parts[$dir][$pi];
my $startpos = $p->{bufs}[0]{endpos}
- length($p->{bufs}[0]{data});
if ( $p->{lppos} and (
$p->{lppos} == IMP_MAXOFFSET or
($offset != IMP_MAXOFFSET and $offset < $p->{lppos})
)) {
# ignore because we got an IMP_(PRE)PASS with a higher
# offset before
$DEBUG && debug("impcb[$dir] $rtype ignoring, ".
"offset($offset)<lppos($p->{lppos})");
} elsif ( $offset <= $startpos and $offset != IMP_MAXOFFSET ) {
# ignore because we got an IMP_(PRE)PASS for data we
# already processed
$DEBUG && debug("impcb[$dir][$pi] $rtype ignoring, ".
"offset($offset)<pos($startpos)");
} else {
# set lppos and lptype from result and call process, to
# see, if we could forward some more data
$DEBUG && debug("impcb[$dir][$pi] $rtype off=$offset, ".
"lppos: $p->{lppos} -> $offset");
$p->{lppos} = $offset;
$p->{lptype} = $rtype;
$dir_changed{$dir}++;
$process->($pi,$dir);
}
} elsif ( $rtype == IMP_REPLACE ) {
my ($dir,$offset,$newdata) = @$rv;
$DEBUG && debug("impcb[%d][%d] %s off=%d '%s'\n%s",
$dir,$pi,$rtype,$offset,$newdata,$_dump_part->($dir));
my $p = $parts[$dir][$pi];
my $startpos = $p->{bufs}[0]{endpos}
- length($p->{bufs}[0]{data});
if ( $offset == IMP_MAXOFFSET
or $offset > $p->{bufs}[-1]{endpos} ) {
# we cannot replace future data
die "cannot replace future data, endpos=".
$p->{bufs}[-1]{endpos}." offset=$offset";
} elsif ( $p->{lppos} and
( $p->{lppos} == IMP_MAXOFFSET or $offset < $p->{lppos} )) {
# We got a replacement for data which earlier received an
# IMP_(PRE)PASS. This should never happen (but can, if the
# analyzer is bogus).
die "cannot replace \@$offset because of ".
"$p->{lptype} \@$p->{lppos}";
} elsif ( $offset <= $startpos ) {
# We got a replacement for data, we already handled.
# This should never happen (but can, if the analyzer is
# bogus).
die "cannot replace already processed data";
} elsif ( $p->{partial_pass} ) {
die "cannot replace part of packet (after partially passed data)";
} else {
# The replacement consists of two pieces:
# - remove all data which should be replaced from bufs
# - call process with the data which should be used instead
# first remove everything in bufs up to offset with newdata
# badjust is the adjustment done in the buffer, e.g. how
# much bytes got added (or removed if badjust<0)
my $badjust = length($newdata) - ($offset - $startpos);
$DEBUG && debug("impcb[%d][%d] %s %d '%s' badjust=%d ",
$dir,$pi,$rtype,$offset,$newdata,$badjust);
# gbadjust is the sum of all existing gbadjust from the
# buffers we remove
my $gbadjust = 0;
my $bufs = $p->{bufs};
my $dtype;
while (1) {
my $buf = $bufs->[0];
$dtype = $buf->{dtype};
if ( $buf->{endpos} <= $offset ) {
$DEBUG && debug("remove whole buffer (%s/%d)",
$buf->{rtype},$buf->{endpos});
# remove whole buffer
shift(@$bufs);
# add any gbadjust we would remove to the new val
# we don't need to do the same with badjust because
# badjust is 0 for all unprocessed buffers in a
# part
$gbadjust += $buf->{gbadjust};
# add empty buf and exit loop in case all bufs are
# eaten
if ( !@$bufs ) {
push @$bufs, Net::IMP::Cascade::_Buf->new(
endpos => $offset,
padjust => $buf->{padjust},
gpadjust => $buf->{gpadjust},
);
last;
}
} elsif ( $buf->{endpos} - $offset
< length($buf->{data})) {
$DEBUG && debug("remove part of buffer (%s/%d)",
$buf->{rtype},$buf->{endpos});
# Remove only first part of buffer up to $offset.
# Partial replace is only available for stream data
if ( $buf->{dtype} >0 ) {
croak( sprintf("cannot replace part of buffer %s (%s/%d)",
$buf->{dtype},$buf->{rtype},$buf->{endpos}));
}
# No need to add to gbadjust because adjustments
# are considered at the end of the buffer and we
# keep the end.
# keep last ($buf->{endpos}- $offset) bytes in buf
$buf->{data} = substr($buf->{data},
$offset-$buf->{endpos});
last;
} else {
# nothing to remove
last;
}
}
if ( $badjust ) {
$gbadjust += $badjust;
# Each byte can only be in one buffer, because
# processed data will be immediately removed from the
# current part and forwarded to the next part.
# We need to propagate the adjustment to all data
# which were not processed yet, e.g. data in @$bufs, in
# previous parts and future data (this is done by using
# gpadjust in _dataf).
for (@$bufs) {
$_->{padjust} += $badjust;
$_->{gpadjust} += $badjust;
}
for my $i ( $dir ? ( $pi+1..$#imp ) : ( 0..$pi-1 )) {
for (@{$parts[$dir][$i]{bufs}}) {
$_->{gpadjust} += $badjust;
}
}
}
# process the new data, forward the replacements
$process->($pi,$dir, Net::IMP::Cascade::_Buf->new(
data => $newdata,
endpos => $offset,
dtype => $dtype,
rtype => IMP_REPLACE,
badjust => $badjust,
gbadjust => $gbadjust,
padjust => $bufs->[0]{padjust},
gpadjust => $bufs->[0]{gpadjust},
));
$dir_changed{$dir}++;
}
} else {
die "unsupported type $rtype";
}
}
# if something changed, check if we could update global_lastpass
for my $dir (keys %dir_changed) {
next if $global_lastpass[$dir]{pos} == IMP_MAXOFFSET;
# We traverse all parts for $dir and check the amount of data we
# could forward in each part (e.g. check lppos vs. endpos of last
# buf in part). The minimum of all these values (lpdiff) is the
# number of bytes we could (pre)pass over all parts.
# Additionally we need to find the lptype with the most importance
# which should be used for this data.
my $lpdiff;
my $lptype = 0;
my $px = $parts[$dir];
for my $p (@$px) {
if ( $p->{lppos} == IMP_MAXOFFSET ) {
$lpdiff = IMP_MAXOFFSET;
$lptype = $p->{lptype} if $p->{lptype} > $lptype;
} else {
my $over = $p->{lppos} - $p->{bufs}[-1]{endpos};
if ( $over <= 0 ) {
# nothing can be forwarded
$lpdiff = 0;
last
} elsif ( ! $lpdiff || $lpdiff>$over ) {
# lpdiff can be forwarded with type lptype
$lpdiff = $over;
$lptype = $p->{lptype} if $p->{lptype} > $lptype;
}
}
}
$lptype or next;
my $pos;
if ( $lpdiff == IMP_MAXOFFSET ) {
$pos = IMP_MAXOFFSET;
} elsif ( $lpdiff>0 ) {
$pos = $px->[-1]{bufs}[-1]{endpos} + $lpdiff;
next if $pos <= $global_lastpass[$dir]{pos};
} else {
next;
}
# we got a higher value for global_lastpass
# update and propagate it up
$global_lastpass[$dir] = {
pos => $pos,
type => $lptype
};
$wself->run_callback([$lptype,$dir,$pos]);
}
};
# While we are in $dataf function we will only spool callbacks and process
# them at the end. Otherwise $dataf might cause call of callback which then
# causes call of dataf etc - which makes debugging a nightmare.
my $collect_callbacks;
my $dataf = sub {
$collect_callbacks ||= [];
$_dataf->(@_);
while ( my $cb = shift(@$collect_callbacks)) {
$_imp_cb->(@$cb);
}
$collect_callbacks = undef
};
# wrapper which spools callbacks if within dataf
my $imp_cb = sub {
if ( $collect_callbacks ) {
# only spool and execute later
push @$collect_callbacks, [ @_ ];
return;
}
return $_imp_cb->(@_)
};
# setup callbacks
$imp[$_]->set_callback( $imp_cb,$_ ) for (0..$#imp);
# make some closures available within methods
$self->{dataf} = $dataf;
$self->{closef} = sub {
$dataf = $process = $imp_cb = undef;
@parts = ();
};
return $self;
}
sub data {
my ($self,$dir,$data,$offset,$dtype) = @_;
$self->{dataf}(
$dir ? -1:0, # input part
$dir,
$data,
$offset, # start of $data in input stream
$dtype // IMP_DATA_STREAM, # data type
0, # result type for this data from previous entry in cascade
$data eq '', # eof
0, # gbadjust
0, # gpadjust
);
}
sub DESTROY {
my $closef = shift->{closef};
$closef->() if $closef;
}
# This package just wraps each buffer in parts[dir][pi]{bufs}.
# The fields got described at the beginning of new_analyzer.
package Net::IMP::Cascade::_Buf;
use fields qw(data endpos dtype rtype badjust padjust gpadjust gbadjust eof);
sub new {
my ($class,%args) = @_;
my $self = fields::new($class);
%$self = %args;
$self->{data} //= '';
$self->{$_} //= 0 for
(qw(endpos rtype badjust padjust gpadjust gbadjust eof));
return $self;
}
1;
__END__
=head1 NAME
Net::IMP::Cascade - manages cascade of IMP filters
=head1 SYNOPSIS
use Net::IMP::Cascade;
use Net::IMP::Pattern;
use Net::IMP::SessionLog;
...
my $imp = Net::IMP::Cascade->new_factory( parts => [
Net::IMP::Pattern->new_factory..,
Net::IMP::SessionLog->new_factory..,
]);
=head1 DESCRIPTION
C<Net::IMP::Cascade> puts multiple IMP analyzers into a cascade.
Data get analyzed starting with part#0, then part#1... etc for direction 0
(client to server), while for direction 1 (server to client) the data get
analyzed the opposite way, ending in part#0.
The only argument special to C<new_factory> is C<parts>, which is an array
reference of L<Net::IMP> factory objects.
When C<new_analyzer> gets called on the L<Net::IMP::Cascade>> factory,
C<new_analyzer> will be called on the factory objects of the parts too, keeping
all arguments, except C<parts>.
=head1 TODO
Currently IMP_TOSENDER is not supported
=head1 BUGS
Don't know of any, but the feature and thus the code is way more complex than I
originally hoped :(
=head1 AUTHOR
Steffen Ullrich <sullr@cpan.org>
=head1 COPYRIGHT
Copyright by Steffen Ullrich.
This module is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.