#!/usr/bin/perl -I.
use strict;
use warnings;
my $smallsleep = 0.;
my $bigsleep = 0.5;
my $debug = 0;
my $syncdebug = 0;
my $inactivity = 5;
my $heartbeat = 0.1;
BEGIN {
unless (eval { require Test::More; }) {
print "1..0 # Skipped: must have Test::More installed\n";
exit;
}
}
BEGIN {
unless (eval { require Time::HiRes; }) {
print "1..0 # Skipped: must have Time::HiRes installed\n";
exit;
}
}
use Time::HiRes qw(sleep gettimeofday tv_interval);
use IO::Pipe;
use IO::Event;
use IO::Socket::INET;
use Carp qw(verbose);
use Sys::Hostname;
use Socket;
my $t0 = [gettimeofday];
sleep(0.2);
my $elapsed = tv_interval ( $t0 );
print "# elsapsed: $elapsed\n";
unless ($elapsed > 0.1 && $elapsed < 0.5) {
print "# Time::HiRes::sleep() doesn't work - going slow\n";
$smallsleep = 1;
$bigsleep = 2;
}
my @tests;
my $testcount;
BEGIN {
@tests = (
{ #0
repeat => 5,
desc => "lines end in \\n",
receive => sub {
my $serverTest = shift;
my $ieo = shift;
my $got = <$ieo>;
return $got;
},
results => [
"howdy\n",
"doody",
],
sendqueue => [
"how",
"dy\n",
"doo",
"dy"
],
},
{ #1
repeat => 5,
desc => "paragraph mode",
setup => sub {
my $serverTest = shift;
my $ieo = shift;
$ieo->input_record_separator('');
},
receive => sub {
my $serverTest = shift;
my $ieo = shift;
my $got = <$ieo>;
return $got;
},
results => [
"this is a test\n\n",
"a\nb\n\n",
"c\n\n",
"d\n\n",
"e\n",
],
sendqueue => [
"this is ",
"a test\n",
"\n",
"a\nb\n\nc\n\n\nd\n\n\n\ne\n",
],
},
{ #2
repeat => 5,
desc => "paragraph mode, getlines",
setup => sub {
my $serverTest = shift;
my $ieo = shift;
$ieo->input_record_separator('');
},
receive => sub {
my $serverTest = shift;
my $ieo = shift;
my (@got) = <$ieo>;
return undef unless @got;
return \@got;
},
results => [
[ "this is a test\n\n", ],
[ "a\nb\n\n", "c\n\n", "d\n\n", ],
[ "e\n", ],
],
sendqueue => [
"this is ",
"a test\n",
"\n",
"a\nb\n\nc\n\n\nd\n\n\n\ne\n",
],
},
{ #3
repeat => 5,
desc => "paragraph mode, getline, \$/ set funny",
setup => sub {
my $serverTest = shift;
my $ieo = shift;
$/ = 'xyz';
$ieo->input_record_separator('');
},
receive => sub {
my $serverTest = shift;
my $ieo = shift;
return <$ieo>;
},
results => [
"this is a test\n\n",
"a\nb\n\n",
"c\n\n",
"d\n\n",
"e\n",
],
sendqueue => [
"this is ",
"a test\n",
"\n",
"a\nb\n\nc\n\n\nd\n\n\n\ne\n",
],
},
{ #4
repeat => 5,
desc => "paragraph mode, getlines, \$/ set funny",
setup => sub {
my $serverTest = shift;
my $ieo = shift;
$/ = 'abc';
$ieo->input_record_separator('');
},
receive => sub {
my $serverTest = shift;
my $ieo = shift;
my (@got) = <$ieo>;
return undef unless @got;
return \@got;
},
results => [
[ "this is a test\n\n", ],
[ "a\nb\n\n", "c\n\n", "d\n\n", ],
[ "e\n", ],
],
sendqueue => [
"this is ",
"a test\n",
"\n",
"a\nb\n\nc\n\n\nd\n\n\n\ne\n",
],
},
);
# @tests = ($tests[3]);
# splice(@tests, 0, 4);
# $tests[0]->{repeat} = 1;
$testcount = 0;
for my $t (@tests) {
my $subtests = scalar(@{$t->{results}}) + 1;
$testcount += $t->{repeat} > 0 ? $t->{repeat} * $subtests : $subtests;
}
}
BEGIN {
use Test::More tests => $testcount;
}
my $startingport = 1025;
my $rp = pickport();
my $child;
my $timer;
my $hbtimer;
$SIG{PIPE} = sub {
print "# SIGPIPE recevied in $$\n";
};
my $pipe = new IO::Pipe;
if ($child = fork()) {
print "# PARENT $$ will listen at 127.0.0.1:$rp\n" if $debug;
my $listener = IO::Event::Socket::INET->new(
Listen => 10,
Proto => 'tcp',
LocalPort => $rp,
LocalAddr => '127.0.0.1',
Handler => new Server,
Description => 'Listener',
);
$timer = Timer->new();
$hbtimer = Heartbeat->new();
$Event::DIED = $Event::DIED = sub {
Event::verbose_exception_handler(@_);
Event::unloop_all();
};
$pipe->writer();
$pipe->autoflush(1);
print $pipe "l";
print "# PARENT looping\n";
IO::Event::loop();
print "# PARENT done looping\n";
} elsif (defined($child)) {
print "# CHILD $$ will connect to 127.0.0.1:$rp\n" if $debug;
$pipe->reader();
syncto("l");
while (@tests) {
my $test = $tests[0] || last;
shift @tests
if --$test->{repeat} < 1;
print "# test $test->{desc}\n";
my $s = IO::Socket::INET->new(
PeerAddr => '127.0.0.1',
PeerPort => $rp,
Proto => 'tcp',
);
syncto("a");
die "$$ could not connect: $!" unless $s;
die "$$ socket not open" if eof($s);
my $go = <$s>;
$go =~ s/\n/\\n/g;
print "# got '$go'\n" if $debug;
for (my $sqi = 0; $sqi <= $#{$test->{sendqueue}}; $sqi++) {
syncclear();
if ($debug) {
my $x = $test->{sendqueue}[$sqi];
$x =~ s/\n/\\n/g;
print "# SENDING '$x'\n";
}
(print $s $test->{sendqueue}[$sqi]) || die "print $$: $!\n";
syncany();
}
print "# CHILD closing\n";
close($s);
}
} else {
die "fork: $!";
}
exit 0;
# support routine
sub pickport
{
for (my $i = 0; $i < 1000; $i++) {
my $s = new IO::Socket::INET (
Listen => 1,
LocalPort => $startingport,
);
if ($s) {
$s->close();
return $startingport++;
}
$startingport++;
}
die "could not find an open port";
}
sub syncany
{
print "syncany\n" if $syncdebug;
$pipe->blocking(1);
my $buf;
$pipe->read($buf, 1);
syncclear();
print "syncany done - $buf\n" if $syncdebug;
}
sub syncto
{
my $lookfor = shift;
print "syncto $lookfor\n" if $syncdebug;
$pipe->blocking(1);
my $buf;
while ($pipe->read($buf, 1) > 0) {
print "syncto got $buf\n" if $syncdebug;
last if $buf eq $lookfor;
}
print "syncto $lookfor done\n" if $syncdebug;
}
sub syncclear
{
print "synclear\n" if $syncdebug;
$pipe->blocking(0);
my $buf;
while ($pipe->read($buf, 4096)) {
print "syncclear: '$buf'\n" if $syncdebug;
}
print "syncclear done\n" if $syncdebug;
}
package Server;
use Test::More;
sub new
{
my $pkg = shift;
return bless { @_ };
}
sub ie_connection
{
my ($self, $s) = @_;
$timer->reset;
my $serverTest = new Server;
my $stream = $s->accept($serverTest);
$serverTest->{stream} = $stream;
$serverTest->{rqi} = 0;
my $test = $tests[0];
shift @tests
if --$test->{repeat} < 1;
@$serverTest{keys %$test} = values %$test;
my $setup = $serverTest->{setup};
&$setup($serverTest, $stream) if $setup;
print "# ACCEPTED CONNECTION\n" if $debug;
print "pipesend 'a'\n" if $syncdebug;
print $pipe "a";
print $stream "go\n";
}
sub ie_input
{
my ($self, $s) = @_;
my $rec = $self->{receive};
die unless $rec;
for (;;) {
my $r = &$rec($self, $s);
return unless defined $r;
my $expect = $self->{results}[$self->{rqi}++];
is_deeply($r, $expect);
}
print "pipesend 'i'\n" if $syncdebug;
print $pipe "i";
}
sub ie_eof
{
my ($self, $s) = @_;
is($self->{rqi}, scalar(@{$self->{results}}));
$s->close();
print "pipesend 'e'\n" if $syncdebug;
print $pipe "e";
exit 0 unless @tests;
}
package Timer;
use Carp;
use strict;
use warnings;
sub new
{
my ($pkg) = @_;
my $self = bless { }, $pkg;
$self->{event} = IO::Event->timer(
cb => [ $self, 'timeout' ],
interval => $inactivity,
hard => 0,
desc => 'inactivity timer',
);
return $self;
}
sub timeout
{
print STDERR "Timeout\n";
kill 9, $child;
IO::Event::unloop_all(7.2);
exit(1);
}
sub reset
{
my ($self) = @_;
$self->{event}->stop();
$self->{event}->again();
}
package Heartbeat;
use Carp;
use strict;
use warnings;
sub new
{
my ($pkg) = @_;
my $self = bless { }, $pkg;
$self->{event} = IO::Event->timer(
cb => [ $self, 'timeout' ],
interval => $heartbeat,
hard => 0,
desc => 'heartbeat timer',
);
return $self;
}
sub timeout
{
print "pipesend 't'\n" if $syncdebug;
print $pipe "t";
}
1;
__END__