#!/usr/bin/perl -w
# This program tests the high and low watermarks. It merges the
# wheels from wheels.perl and the chargen service from selects.perl to
# create a wheel-based chargen service.
use strict;
use lib '../lib';
use POE qw(Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Line);
my $chargen_port = 32100;
#==============================================================================
# This is a simple TCP server. It answers connections and passes them
# to new chargen service sessions.
package Chargen::Server;
use POE::Session;
# Create a new chargen server. This doesn't create a real object; it
# just spawns a new session. OO purists will hate me for this.
sub new {
POE::Session->create
( inline_states =>
{ _start => \&poe_start,
accepted => \&poe_accepted,
error => \&poe_error,
}
);
undef;
}
# The Session has been set up within POE::Kernel, so it's safe to
# begin working. Create a socket factory to listen for new
# connections.
sub poe_start {
$_[HEAP]->{listener} = POE::Wheel::SocketFactory->new
( SuccessEvent => 'accepted',
FailureEvent => 'error',
BindPort => $chargen_port,
Reuse => 'yes',
);
}
# Start a session to handle successfully connected clients.
sub poe_accepted {
Chargen::Connection->new($_[ARG0]);
}
# Upon error, log the error and stop the server. Client sessions may
# still be running, and the process will continue until they
# gracefully exit.
sub poe_error {
warn "Chargen::Server encountered $_[ARG0] error $_[ARG1]: $_[ARG2]\n";
delete $_[HEAP]->{listener};
}
#==============================================================================
# This is a simple chargen service.
package Chargen::Connection;
use POE::Session;
# Create a new chargen session around a successfully accepted socket.
sub new {
my ($package, $socket) = @_;
POE::Session->create
( inline_states =>
{ _start => \&poe_start,
wheel_got_flush => \&poe_got_flush,
wheel_got_input => \&poe_got_input,
wheel_got_error => \&poe_got_error,
wheel_throttle => \&poe_throttle,
wheel_resume => \&poe_resume,
write_chunk => \&poe_write_chunk,
},
args => [ $socket ],
);
undef;
}
# The session was set up within POE::Kernel, so it's safe to begin
# working. Wrap a ReadWrite wheel around the socket, set up some
# persistent variables, and begin writing chunks.
sub poe_start {
$_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new
( Handle => $_[ARG0],
Driver => POE::Driver::SysRW->new(),
Filter => POE::Filter::Line->new(),
InputEvent => 'wheel_got_input',
ErrorEvent => 'wheel_got_error',
HighMark => 256,
LowMark => 128,
HighEvent => 'wheel_throttle',
LowEvent => 'wheel_resume',
);
$_[HEAP]->{okay_to_send} = 1;
$_[HEAP]->{start_character} = 32;
$_[KERNEL]->yield('write_chunk');
}
# The client sent us input. Rather than leaving it on the socket,
# we've read it to ignore it.
sub poe_got_input {
warn "Chargen session ", $_[SESSION]->ID, " is ignoring some input.\n";
}
# An error occurred. Log it and stop this session. If the parent
# hasn't stopped, then it will continue running.
sub poe_got_error {
warn( "Chargen session ", $_[SESSION]->ID, " encountered ", $_[ARG0],
" error $_[ARG1]: $_[ARG2]\n"
);
$_[HEAP]->{okay_to_send} = 0;
delete $_[HEAP]->{wheel};
}
# Write a chunk of data to the client socket.
sub poe_write_chunk {
# Sometimes a write-chunk event comes in that ought not. This race
# occurs because water-mark events are called synchronously, while
# write-chunk events are posted asynchronously. So it may not be
# okay to write a chunk when we get a write-chunk event.
return unless $_[HEAP]->{okay_to_send};
# Enqueue chunks until ReadWrite->put() signals that its driver's
# buffer has reached (or exceeded) its high-water mark.
while (1) {
# Create a chargen line. Build a 72-column line of consecutive
# characters, starting with whatever character code we have
# stored. Wrap characters beyond "~" around to " ".
my $chargen_line =
join( '',
map { chr }
($_[HEAP]->{start_character} .. ($_[HEAP]->{start_character}+71))
);
$chargen_line =~ tr[\x7F-\xDD][\x20-\x7E];
# Increment the start character, wrapping \x7F to \x20.
$_[HEAP]->{start_character} = 32
if (++$_[HEAP]->{start_character} > 126);
# Enqueue the line for output. Stop enqueuing lines if the
# buffer's high water mark is reached.
last if $_[HEAP]->{wheel}->put($chargen_line);
}
warn "Chargen session ", $_[SESSION]->ID, " writes are paused.\n";
}
# Be impressive. Log that the session has throttled, and set a flag
# so spurious write-chunk events are ignored.
sub poe_throttle {
warn "Chargen session ", $_[SESSION]->ID, " is throttled.\n";
$_[HEAP]->{okay_to_send} = 0;
}
# Be impressive, part two. Log that the session has resumed sending,
# and clear the stop-writing flag. Only bother doing this if there's
# still a handle; that way it doesn't keep looping around after an
# error or something.
sub poe_resume {
if (exists $_[HEAP]->{wheel}) {
warn "Chargen session ", $_[SESSION]->ID, " is resuming.\n";
$_[HEAP]->{okay_to_send} = 1;
$_[KERNEL]->yield('write_chunk');
}
}
#==============================================================================
# Main loop. Create the server, and run it until something stops it.
package main;
print( "*** If all goes well, a watermarked (self-throttling) chargen\n",
"*** service will be listening on localhost port $chargen_port.\n",
"*** Watch it perform flow control by connecting to it over a slow\n",
"*** connection or with a client you can pause. The server will\n",
"*** throttle itself when its output buffer becomes too large, and\n",
"*** it will resume output when the client receives enough data.\n",
);
Chargen::Server->new;
$poe_kernel->run();
exit;