The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;
use Net::IMP::Remote::Server;
use Net::IMP::Remote::Connection;
use Net::IMP::Debug qw($DEBUG $DEBUG_RX debug);
use Net::IMP::Cascade;
use AnyEvent;
use IO::Socket::INET;
use IO::Socket::UNIX;
use Getopt::Long qw(:config posix_default bundling);

sub usage {
    print STDERR "ERROR: @_\n" if @_;
    print STDERR <<USAGE;

Server for IMP plugins.
Listens on given address(es) for access from data provider and filters given
traffic according to configured modules.

Usage: $0 [-M|--module plugin]+ [options] listen-addr(s)
Options:
  -h|--help             this usage
  -d|--debug [rx]       enable debugging (for package matching rx only)
  -M|--module mod=args  use IMP module
  -I|--impl type        type of serializer for RPC 
    
USAGE
    exit(1);
}


my $INETCLASS = 'IO::Socket::INET';
BEGIN {
    for(qw(IO::Socket::IP IO::Socket::INET6)) {
	eval "require $_" or next;
	$INETCLASS = $_;
	last;
    }
}

my (@debug_pkg,@mod,$impl);
GetOptions(
    'h|help'     => sub { usage() },
    'd|debug:s'  => \@debug_pkg,
    'M|module=s' => \@mod,
    'I|impl=s'   => \$impl,
);
my @listen = @ARGV;

if (@debug_pkg) {
    $DEBUG = 1;
    # glob2rx
    s{(\*)|(\?)|([^*?]+)}{ $1 ? '.*': $2 ? '.': "\Q$3" }esg for (@debug_pkg);
    $DEBUG_RX = join('|',@debug_pkg);
}

@mod or usage('no IMP modules defined');
@listen or usage('no listen addresses defined');

my @factory;
for my $mod (@mod) {
    my ($class,$args) = $mod =~m{^([a-z][\w:]*)(?:=(.*))?$}i
	or die "invalid module $mod";
    eval "require $class" or die "cannot load $class: $@";
    my %args = defined $args ? $class->str2cfg($args) : ();
    if ( my @err = $class->validate_cfg(%args)) {
	die "wrong args for $class: @err";
    } 
    push @factory, $class->new_factory(%args);
}

my $factory = 
    @factory == 1 ? $factory[0] :
    Net::IMP::Cascade->new_factory( parts => \@factory );

my @lwatch;
my %rpc; # active connections
for my $addr (@listen) {
    my $srv;
    if ( $addr =~m{/} ) {
	unlink($addr);
	$srv = IO::Socket::UNIX->new(
	    Local => $addr,
	    Listen => 100,
	    Type => SOCK_STREAM,
	) or die "failed to listen on unix socket $addr: $!";
	debug("listen on unix $addr");
    } else {
	$srv = $INETCLASS->new(
	    LocalAddr => $addr,
	    Listen => 100,
	    Reuse => 1,
	) or die "failed to listen on inet socket $addr: $!";
	debug("listen on inet $addr");
    }

    push @lwatch, AnyEvent->io( fh => $srv, poll => 'r', cb => sub {
	my $cl = $srv->accept or return;
	debug("new connection");
	$cl->blocking(0);
	my $conn = Net::IMP::Remote::Connection->new($cl,1, 
	    impl => $impl, 
	    eventlib => my::Event->new 
	);
	my $key;
	$conn->onClose( sub { 
	    shift; # self
	    delete $rpc{$key};
	    warn("[$key] error: @_") if @_;
	});
	my $rpc = Net::IMP::Remote::Server->new($conn,$factory);
	$rpc{$key = "$rpc"} = $rpc;
    });
}
AnyEvent->condvar->recv;


	

package my::Event;
use Net::IMP::Debug;
sub new {  bless {},shift }
{
    my %watchr;
    sub onread {
	my ($self,$fh,$cb) = @_;
	defined( my $fn = fileno($fh)) or die "invalid filehandle";
	if ( $cb ) {
	    debug(( $watchr{$fn} ? "update":"add" )." read-watcher $fn");
	    $watchr{$fn} = AnyEvent->io( 
		fh => $fh, 
		cb => $cb, 
		poll => 'r' 
	    );
	} elsif ( $watchr{$fn} ) {
	    debug("remove read-watcher $fn");
	    undef $watchr{$fn};
	}
    }
}

{
    my %watchw;
    sub onwrite {
	my ($self,$fh,$cb) = @_;
	defined( my $fn = fileno($fh)) or die "invalid filehandle";
	if ( $cb ) {
	    debug(( $watchw{$fn} ? "update":"add" )." write-watcher $fn");
	    $watchw{$fn} = AnyEvent->io( 
		fh => $fh, 
		cb => $cb, 
		poll => 'w' 
	    );
	} elsif ( $watchw{$fn} ) {
	    debug("remove write-watcher $fn");
	    undef $watchw{$fn};
	}
    }
}

sub now { return AnyEvent->now }
sub timer {
    my ($self,$after,$cb,$interval) = @_;
    return AnyEvent->timer( 
	after => $after, 
	cb => $cb,
	$interval ? ( interval => $interval ):()
    );
}