#!/usr/bin/perl
use strict;
use warnings;
use Getopt::Long qw(:config posix_default bundling);
use AnyEvent;
use AnyEvent::Socket qw(tcp_server tcp_connect parse_hostport format_address);
use Net::IMP;
use Net::IMP::Debug qw(:DEFAULT $DEBUG_RX);
# get a chance to cleanup
$SIG{TERM} = $SIG{INT} = sub { exit(0) };
sub usage {
print STDERR <<USAGE;
Relay which uses Net::IMP analyzers for inspection and modification of traffic
$0 Options* --listen ... --connect...
Options:
-h|--help show usage
-M|--module mod[=arg] use Net::IMP module for connections
-L|--listen ip:port listen on this socket
-C|--connect target and forward to this (ip:port,'socks4')
--http-only only analyze traffic which looks like http (pass thru https)
-d|--debug [pkg] debug mode, if pkg given only packages matching pkg,
e.g. 'ContentSecurityPolicy*'
USAGE
exit(2);
}
my (@listen,@laddr,$module,@debug_pkg,$http_only);
GetOptions(
'M|module=s' => \$module,
'L|listen=s' => \@laddr,
'C|connect=s' => sub {
@laddr or die "specify listener first\n";
push @listen, [ $_[1],$module,@laddr ];
@laddr = ();
},
'http-only' => \$http_only,
'd|debug:s' => \@debug_pkg,
'h|help' => sub { usage() },
);
if (@debug_pkg) {
$DEBUG = 1;
# glob2rx
s{(\*)|(\?)|([^*?]+)}{ $1 ? '.*': $2 ? '.': "\Q$3" }esg for (@debug_pkg);
$DEBUG_RX = join('|',@debug_pkg);
}
push @listen, [ undef,@laddr ] if @laddr;
@listen or usage("no listener specified");
# create listeners
my @active_connections;
for my $l (@listen) {
my ($raddr,$module,@laddr) = @$l;
my ($rhost,$rport);
($rhost,$rport) = parse_hostport($raddr) or die "invalid raddr $raddr"
if $raddr and $raddr ne 'socks4';
my $imp_factory;
if ($module and $module ne '=') {
my ($mod,$args) = $module =~m{^([a-z][\w:]*)(?:=(.*))?$}i
or die "invalid module $module";
eval "require $mod" or die "cannot load $mod args=$args: $@";
my %args = $mod->str2cfg($args//'');
$imp_factory = $mod->new_factory(%args);
$imp_factory && $imp_factory->interface([
IMP_DATA_STREAM,
[
IMP_PASS,
IMP_PREPASS,
IMP_DENY,
IMP_REPLACE,
IMP_TOSENDER,
IMP_LOG,
IMP_ACCTFIELD,
],
]) or croak("cannot create Net::IMP factory for $mod");
}
for my $laddr (@laddr) {
my ($lhost,$lport) = parse_hostport($laddr) or die "invalid laddr $laddr";
debug("listen on $lhost,$lport");
my $fwd = sub {
my ($cfh,$chost,$cport,$reply) = @_;
#debug("attempting connect to $rhost,$rport");
tcp_connect($rhost,$rport,sub {
my $sfh = shift;
#debug("connect to $rhost,$rport succeeded");
if ( $reply ) {
if ( syswrite($cfh,$reply) != length($reply)) {
# should not block on such few bytes, assume error
debug("syswrite socks reply failed: $!");
close($cfh);
return;
}
}
my $imp = $imp_factory && $imp_factory->new_analyzer( meta => {
app => 'imp_proxy',
caddr => $chost, cport => $cport,
raddr => $rhost, rport => $rport,
laddr => $lhost, lport => $lport,
});
push @active_connections, Connection->new($cfh,$sfh,$imp);
});
};
my $fwd_transp = sub {
my ($cfh,$chost,$cport) = @_;
# transparent, get target with getsockname
($rport,$rhost) = AnyEvent::Socket::unpack_sockaddr(getsockname($cfh));
$rhost = format_address($rhost);
if ( $rhost eq $lhost and $rport == $lport ) {
# not transparent
debug("attempt to connect to transparent socket non-transparently");
close($cfh);
return;
}
$fwd->(@_);
};
my $fwd_socks = sub {
my ($cfh,$chost,$cport) = @_;
my $socks4hdr = '';
my $len = 9; # minimal len: 8 byte hdr + 0 byte user + "\0"
my $iow;
my $iot = AnyEvent->timer( after => 10, cb => sub {
debug("no socks4 header within 10 seconds");
$iow = undef;
close($cfh);
return;
});
my $gethdr = sub {
my $rv = sysread($cfh,$socks4hdr,$len-length($socks4hdr));
return if ! defined $rv and $!{EAGAIN}; # retry
if ( ! $rv ) {
debug("closed/error before getting socks4 header: $!");
$iow = $iot = undef;
close($cfh);
return;
}
return if length($socks4hdr) != $len;
# found
$iow = $iot = undef;
(my $proto, my $typ,$rport,$rhost) = unpack('CCna4',$socks4hdr);
if ( $proto != 4 or $typ != 1 ) {
debug("bad sockshdr: proto=$proto, typ=$typ");
close($cfh);
return;
}
if ( substr($socks4hdr,8) !~m{\0} ) {
# username given, need more bytes
$len++;
die "username too long" if $len>512;
return;
}
my $reply = pack('CCna4',0,90,$rport,$rhost);
$rhost = format_address($rhost);
debug("socks4 fwd to $rhost,$rport");
$fwd->($cfh,$chost,$cport,$reply);
};
$iow = AnyEvent->io(
fh => $cfh,
poll => 'r',
cb => $gethdr,
);
};
tcp_server($lhost,$lport,
! $raddr ? $fwd_transp :
$raddr eq 'socks4' ? $fwd_socks :
$fwd
);
}
}
# debug info on USR1
my $sw = AnyEvent->signal(
signal => 'USR1',
cb => sub {
debug("-------- active connections -------------");
$_->dump_state for(@active_connections);
debug("-----------------------------------------");
}
);
# timer for expiring connections
my $xpt = AnyEvent->timer(
after => 5,
interval => 5,
cb => sub {
@active_connections = grep { $_ && $_->{expire} } @active_connections;
@active_connections or return;
debug("check timeouts for %d conn",0+@active_connections);
my $now = AnyEvent->now;
for (@active_connections) {
$_ or next;
$_->xdebug("expire=%d now=%d", $_->{expire},$now);
$_->{expire} > $now and return;
$_->close;
}
}
);
# Mainloop
$SIG{PIPE} = 'IGNORE'; # catch EPIPE
my $loopvar = AnyEvent->condvar;
$loopvar->recv;
exit;
############################################################################
# Connection object
############################################################################
package Connection;
use Hash::Util 'lock_keys';
use Net::IMP;
use Net::IMP::Debug;
use constant READSZ => 8192;
use Socket 'MSG_PEEK';
my $connid;
BEGIN { $connid = 0 }
sub new {
my ($class,$cfh,$sfh,$imp) = @_;
my @fh = ($cfh,$sfh);
my @eof = (0,0);
# read and write buffer
# data are usually read into rbuf, then analyzed and later put into wbuf
# if no Net::IMP filtering is done we can skip rbuf and write directly
# to wbuf.
# If wbuf is not empty the read handler on $from will be disabled
# and a write handler on $to enabled until all data are written
my @rbuf = ( '','' );
my @wbuf = ( '','' );
# event handlers and watchers
# AnyEvent disables fd event by deleting the watcher and enables by
# adding a io-callback. Because regenerating callback all the time
# is costly we store it here
my (@rcb,@rwatch);
my (@wcb,@wwatch);
my $self = bless {
expire => AnyEvent->now + 30, # last activity for connection expire
connid => ++$connid,
}, $class;
$self->{closef} = my $close = sub {
# undef everything which is somehow connected to $self and might
# hinder destruction by crossreferencing
$imp && $imp->set_callback(undef);
@fh = @rcb = @wcb = @rwatch = @wwatch = ();
$self->{expire} = undef;
};
lock_keys(%$self);
# Net::IMP specific
my @imp_passed = (0,0); # offset of rbuf[0] in stream
my @imp_topass = (0,0); # can pass up to this offset
my @imp_prepass = (0,0); # flag if data needs to prepass, not pass
my @imp_skipped = (0,0); # flag if data got not send to imp because of pass into future
# bytes from initial read, used for http_only feature
my $initial_read = '';
# defined read handler:
# read data into rbuf (or wbuf on some optimizations)
# ---------------------------------------------------------------------
for my $ft ([0,1],[1,0]) {
my ($from,$to) = @$ft;
$rcb[$from] = sub {
my $woff = length($wbuf[$to]); # >0 == was stalled
#$self->xdebug("read from $from");
my ($n,$need_imp);
if ( $imp and $http_only and length($initial_read)<10 ) {
if ( $from == 1 ) {
# server sends data before client sends request,
# this cannot be http
debug("no http because server send first");
$imp = undef;
$wbuf[$from] = $initial_read;
$initial_read = undef;
} else {
my $rv = sysread($fh[$from],$initial_read,10-length($initial_read),length($initial_read));
if ( ! defined $rv ) {
&$close if ! $!{EAGAIN};
return;
} elsif ( $rv == 0 ) { # eof
# cannot be http
} elsif ( length($initial_read)<10 ) {
# try later
return;
} else {
debug("got '$initial_read'");
if ( $initial_read =~m{^([A-Z]{3,})[ \t]} ) {
debug("might be HTTP, not ssl at least");
$rbuf[$from] = $initial_read;
goto is_http;
}
}
# not http
debug("definitly not HTTP");
$imp = undef;
$wbuf[$to] = $initial_read;
is_http: ;
$n = length($initial_read);
}
}
if ( ! $imp ) {
# no analysis, direct read into wbuf
$n||= sysread($fh[$from],$wbuf[$to],READSZ,$woff)
} elsif ( ( my $diff = $imp_topass[$from]-$imp_passed[$from] )>0 ) {
$self->xdebug("can pass $diff w/o analyzing");
# no analysis because of pass in future, read directly into wbuf
$rbuf[$from] eq '' or die "rbuf[$from] should be empty";
$n and die "should not be set from initial_data";
my $sz = $diff>READSZ ? READSZ:$diff;
$n = sysread($fh[$from],$wbuf[$to],$sz,$woff);
if ($n) {
$imp_skipped[$from] = 1;
$imp_passed[$from] += $n;
}
} else {
# read into rbuf for analysis
$need_imp = 1;
$n ||= sysread($fh[$from],$rbuf[$from],READSZ,
length($rbuf[$from]));
}
# error
if ( ! defined $n ) {
return if $!{EAGAIN};
$self->xdebug("error reading $from: $!");
&$close;
return;
# eof
} elsif ( $n == 0 ) {
$self->xdebug("eof on $from");
$eof[$from] = 1;
# disable further read events
$rwatch[$from] = undef;
# send eof to analyzer if it it interested in the data
if ( $need_imp and (
$imp_prepass[$from] or
$imp_topass[$from] != IMP_MAXOFFSET )) {
if ( $imp_skipped[$from] ) {
$imp_skipped[$from] = 0;
$imp->data($from,'',
$imp_passed[$from] + length($rbuf[$from]));
} else {
$imp->data($from,'');
}
# connection might have been closed by Net::IMP callback
@wcb or return;
}
if ( $eof[$to] ) {
$self->xdebug("end of connection");
&$close;
return;
} elsif ( $wbuf[$to] eq ''
and $rbuf[$from] eq '' ) {
$self->xdebug("shutdown $to,1");
# write close $to if everything was written
shutdown($fh[$to],1);
# short expire
$self->{expire} = AnyEvent->now + 5;
} else {
debug("wait until buffers are empty wbuf[$to]='$wbuf[$to]' rbuf[$from]='$rbuf[$from]'");
}
# send new data to analyzer or
# try to write new data immediatly if not stalled
} else {
$self->{expire} = AnyEvent->now + 30;
# feed analyzer with new bytes
if ( $need_imp ) {
if ( $imp_skipped[$from] ) {
$imp_skipped[$from] = 0;
$imp->data($from, substr($rbuf[$from],-$n),
$imp_passed[$from] + length($rbuf[$from]) -$n );
} else {
$imp->data($from, substr($rbuf[$from],-$n));
}
# connection might have been closed by Net::IMP callback
@wcb or return;
}
# prepass data?
if ( $imp_prepass[$from] ) {
my $diff = $imp_topass[$from] - $imp_passed[$from];
if ($diff>0) {
# smthg to prepass
my $l = length($rbuf[$from]);
$l = $diff if $diff<$l;
$imp_passed[$from] += $l;
$wbuf[$to] .= substr($rbuf[$from],0,$l,'');
} else {
# reset prepass, because it's done
$imp_prepass[$from] = 0;
}
}
# write if new data and was not stalled
$wcb[$to](1) if ! $woff and $wbuf[$to] ne '';
}
};
}
# define write handler: write data from wbuf
# if after write attempt still data in wbuf it will stall the connection
# (disable read, setup write handler) and unstall it once wbuf is empty again
# ---------------------------------------------------------------------
for my $ft ([0,1],[1,0]) {
my ($from,$to) = @$ft;
$wcb[$to] = sub {
my $quick = shift;
return if $wbuf[$to] eq '';
#$self->xdebug("write to $to");
my $n = syswrite( $fh[$to], $wbuf[$to] );
# error
if ( ! $n ) { # XXX $n == 0 should never happen?
return if $!{EAGAIN};
$self->fatal("connection($to) broke: $!");
# partial write
} elsif ( length($wbuf[$to]) < $n ) {
substr($wbuf[$to],0,$n,'');
$quick or return; # was already stalled
# call was from non-stalled connection, make it stalled
# by disabling read and setting up write handler
$self->xdebug("connection stalled");
$rwatch[$from] = undef;
$wwatch[$to] = AnyEvent->io(
fh => $fh[$to],
poll => 'w',
cb => $wcb[$to]
);
# full write
} else {
$wbuf[$to] = '';
if ( ! $quick ) {
# call was from stalled connection, which is no longer
# stalled. Disable write handler and enable read handler
$self->xdebug("connection unstalled");
$wwatch[$to] = undef;
if ( ! $eof[$from] ) {
$rwatch[$from] = AnyEvent->io(
fh => $fh[$from],
poll => 'r',
cb => $rcb[$from]
);
}
}
if ( $eof[$from] and $rbuf[$from] eq '' ) {
# can shutdown write
$self->xdebug("shutdown $to,1");
shutdown($fh[$to],1);
$self->{expire} = AnyEvent->now + 5;
}
}
};
}
# set Net::IMP callback
# ---------------------------------------------------------------------
$imp and $imp->set_callback( sub {
my @tosend; # new data in wbuf on non-stalled connections
for my $rv (@_) {
my $rtype = shift(@$rv);
$self->xdebug( "$rtype @$rv");
if ( $rtype == IMP_DENY ) {
# close connection
my ($dir,$msg) = @$rv;
# silent close if no msg
# FIXME use smthg better then just debug
$self->xdebug("connection denied: dir=$dir '$msg'") if $msg;
&$close;
return;
} elsif ( $rtype == IMP_LOG ) {
my ($dir,$offset,$len,$level,$msg) = @$rv;
# FIXME use smthg better then just debug
$self->xdebug("imp[$dir] off=$offset/$len <$level> $msg");
} elsif ( $rtype == IMP_ACCTFIELD ) {
my ($key,$value) = @$rv;
# FIXME use smthg better then just debug
$self->xdebug("accounting $key=$value");
} elsif ( $rtype ~~ [ IMP_PASS, IMP_PREPASS, IMP_REPLACE ] ) {
my ($dir,$offset,$newdata) = @$rv;
$self->xdebug("got $rtype $dir|$offset passed=$imp_passed[$dir]");
my $diff = $offset - $imp_passed[$dir];
if ( $diff<0 ) {
$self->xdebug("diff=$diff - $rtype for already passed data");
# already passed
die "cannot replace already passed data"
if $rtype == IMP_REPLACE;
next;
}
my $rl = length($rbuf[$dir]);
my $l = $rl>$diff ? $diff: $rl;
$self->xdebug("need to $rtype $l bytes");
$imp_passed[$dir] += $l;
$imp_topass[$dir] = $offset;
$imp_prepass[$dir] = ($rtype == IMP_PREPASS);
if ( $rtype == IMP_REPLACE ) {
die "cannot replace not yet received data" if $rl<$diff;
$self->xdebug("buf='%s' [0,$l]->'%s'",substr($rbuf[$dir],0,$l),$newdata);
substr($rbuf[$dir],0,$l,$newdata);
$l = length($newdata);
$rl = length($rbuf[$dir]); # FIXME: do I need this below?
}
# forward data to wbuf of other side
if ($l) {
my $to = $dir?0:1;
push @tosend,$to if $wbuf[$to] eq '';
$wbuf[$to] .= substr($rbuf[$dir],0,$l,'');
}
} elsif ( $rtype == IMP_TOSENDER ) {
my ($dir,$data) = @$rv;
# send data back to sender
push @tosend,$dir if $wbuf[$dir] eq '';
$wbuf[$dir] .= $data;
} else {
die "cannot handle Net::IMP rtype $rtype";
}
}
# output collected data
while ( @tosend ) {
my $dir = shift(@tosend);
$wcb[$dir](1);
}
});
# enable read handler on both sides
# ---------------------------------------------------------------------
for my $from (0,1) {
$rwatch[$from] = AnyEvent->io(
fh => $fh[$from],
poll => 'r',
cb => $rcb[$from]
);
}
return $self;
}
sub close:method {
my $self = shift;
$self->{closef}();
}
sub dump_state {
my $self = shift;
warn "to be done\n";
}
sub fatal {
my ($self,$reason) = @_;
$self->xdebug("fatal: $reason");
$self->close;
}
sub xdebug {
my $self = shift;
my $msg = shift;
unshift @_,"[$self->{connid}] $msg";
goto &debug;
}