The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w

# This program creates a server session and an infinitude of clients
# that connect to it, all in the same process.  It's mainly used to
# test for memory leaks, but it's also something of a benchmark.

# It is possible to split this program into two separate processes:
#   Change $server_addr to something appropriate.
#   Make a second copy of this program.
#   In the "server" copy, comment out the call to &pool_create();
#   In the "client" copy, comment out th ecall to &server_create();

use strict;
use lib '../lib';
use Socket;

#sub POE::Kernel::ASSERT_DEFAULT () { 1 }
use POE qw(Wheel::ListenAccept Wheel::ReadWrite Driver::SysRW Filter::Line
           Wheel::SocketFactory
          );

sub MAX_SIMULTANEOUS_CLIENTS () { 5 }
                                        # make 1 to enable output
sub DEBUG () { 0 }
                                        # address and port the server binds to
my $server_addr = '127.0.0.1';
my $server_port = 32100;

###############################################################################
# This is a single client session.  It uses two separator wheels: a
# SocketFactory to establish a connection, and a ReadWrite to process
# data once the connection is made

#------------------------------------------------------------------------------
# This is regular Perl sub that helps create new clients.  It's not an
# event handler.

sub client_create {
  my $serial_number = shift;
                                        # create the session
  POE::Session->create(
    inline_states => {
      _start    => \&client_start,
      _stop     => \&client_stop,
      receive   => \&client_receive,
      error     => \&client_error,
      connected => \&client_connected,
      signals   => \&client_signals,
      _parent   => sub {},
    },

    # ARG0
    args => [ $serial_number ]
 );
}

#------------------------------------------------------------------------------
# Accept POE's standard _start event, and create a non-blocking client
# socket.

sub client_start {
  my ($kernel, $heap, $serial) = @_[KERNEL, HEAP, ARG0];

  DEBUG && print "Client $serial is starting.\n";
                                        # remember this client's serial number
  $heap->{'serial'} = $serial;
                                        # watch for SIGINT
  $kernel->sig('INT', 'signals');
                                        # create a socket factory
  $heap->{'wheel'} = POE::Wheel::SocketFactory->new(
    RemoteAddress  => $server_addr,   # connecting to address $server_addr
    RemotePort     => $server_port,   # connecting to port $server_port
    SuccessEvent   => 'connected',    # generating this event when connected
    FailureEvent   => 'error',        # generating this event upon an error
  );
}

#------------------------------------------------------------------------------
# Accept POE's standard _stop event.  This normally would clean up the
# session, but this program doesn't keep anything in the heap that
# needs to be cleaned up.

sub client_stop {
  my $heap = $_[HEAP];
  DEBUG && print "Client $heap->{'serial'} has stopped.\n";
}

#------------------------------------------------------------------------------
# This event handler/state is invoked when a connection has been
# established successfully.  It replaces the SocketFactory wheel with
# a ReadWrite wheel.  The new wheel generates different events.

sub client_connected {
  my ($heap, $socket) = @_[HEAP, ARG0];

  die "possible filehandle leak" if fileno($socket) > 63;
  DEBUG && print "Client $heap->{'serial'} is connected.\n";
                                        # switch to read/write behavior
  $heap->{'wheel'} = POE::Wheel::ReadWrite->new(
    Handle     => $socket,                 # read and write on this socket
    Driver     => POE::Driver::SysRW->new, # using sysread and syswrite
    Filter     => POE::Filter::Line->new,  # and parsing I/O as lines
    InputEvent => 'receive',               # generating this event on input
    ErrorEvent => 'error',                 # generating this event on error
  );

  shutdown($socket, 1);
}

#------------------------------------------------------------------------------
# This state is invoked by the ReadWrite wheel to process complete
# chunks of input.

sub client_receive {
  my ($heap, $line) = @_[HEAP, ARG0];
  DEBUG && print "Client $heap->{'serial'} received: $line\n";
}

#------------------------------------------------------------------------------
# This state is invoked by both the SocketFactory and the ReadWrite
# wheels when an error occurs.

sub client_error {
  my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
  if (DEBUG) {
    if ($errnum) {
      print( "Client $heap->{'serial'} encountered ",
             "$operation error $errnum: $errstr\n"
           );
    }
    else {
      print "Client $heap->{'serial'} the server closed the connection.\n";
    }
  }
                                        # removing the wheel stops the session
  delete $heap->{'wheel'};
}

#------------------------------------------------------------------------------
# Catch and log signals.  Never handle them.

sub client_signals {
  my ($heap, $signal_name) = @_[HEAP, ARG0];
  DEBUG && print "Client $heap->{'serial'} caught SIG$signal_name\n";
                                        # doesn't handle SIGINT, so it can stop
  return 0;
}

###############################################################################
# This is a client pool session.  It ensures that at least five
# clients are interacting with the server at any given time.
# Actually, there are brief periods where only four clients are
# connected.

#------------------------------------------------------------------------------
# This is a regular Perl sub that helps create new client pools.  It's
# not an event handler.

sub pool_create {
                                        # create the server
  POE::Session->create(
    inline_states => {
      _start  => \&pool_start,
      _stop   => \&pool_stop,
      signals => \&pool_signals,
      _child  => \&pool_child,
      _parent => sub {},
    },
  );
}

#------------------------------------------------------------------------------
# Accept POE's standard _start event.  Initialize benchmark
# accumulators, and start the first five clients.

sub pool_start {
  my ($kernel, $heap) = @_[KERNEL, HEAP];

  DEBUG && print "Pool starting.\n";
                                        # watch for SIGINT
  $kernel->sig('INT', 'signals');
                                        # keep track of children
  $heap->{'children'} = 0;
  $heap->{'client serial'} = 0;
  $heap->{'state'} = 'running';
                                        # benchmark accumulators
  $heap->{'bench start'} = time();
  $heap->{'bench count'} = 0;

  # Start five clients.  NOTE: This would not work if clients used
  # IO::Socket to connect to the server, because IO::Socket's connect
  # blocks.  It would wait for the server to accept a connection
  # before continuing, which would never happen since this loop is
  # holding up the event queue.  The program can only get away with
  # this loop because SocketFactory connections do not block.

  for (my $i = 0; $i < MAX_SIMULTANEOUS_CLIENTS; $i++) {
    &client_create(++$heap->{'client serial'});
  }
}

#------------------------------------------------------------------------------
# Accept POE's standard stop event.  Also stop the server.

sub pool_stop {
  my $kernel = $_[KERNEL];
                                        # send SIGQUIT to the server
  $kernel->signal('server', 'QUIT');
  DEBUG && print "Pool has stopped.\n";
}

#------------------------------------------------------------------------------
# Catch and log signals, but never handle them.

sub pool_signals {
  my ($heap, $signal_name) = @_[HEAP, ARG0];
  DEBUG && print "Pool caught SIG$signal_name\n";
                                        # doesn't handle SIGINT, so it can stop
  return 0;
}

#------------------------------------------------------------------------------
# Keep track of child sessions, starting new ones to replace old ones
# that are being lost.  If debugging, and a time limit has been
# reached, stop creating new clients.

my %english = ( create => 'created', lose => 'lost', gain => 'gained' );

sub pool_child {
  my ($heap, $direction, $child) = @_[HEAP, ARG0, ARG1];
                                        # lost a client
  if ($direction eq 'lose') {
    $heap->{'children'}--;
                                        # create a new one if still running
    if ($heap->{'state'} eq 'running') {
      &client_create(++$heap->{'client serial'});
    }
  }
                                        # gained a client; keep track of it
  else {
    $heap->{'children'}++;
    $heap->{'bench count'}++;
  }

  DEBUG && print( "Pool $english{$direction} a child session ",
                  "(now has $heap->{'children'}).\n"
                );
                                        # track clients/second for benchmark
  my $elapsed = time() - $heap->{'bench start'};
  if ($elapsed >= 10) {
    print "bench: ", $heap->{'bench count'}, ' / ', $elapsed, ' = ',
          $heap->{'bench count'} / $elapsed, "\n";
    $heap->{'bench count'} = 0;
    $heap->{'bench start'} = time();
                                        # limit run to 60 seconds if debugging
    if (DEBUG && (time() - $^T >= 60.0)) {
      $heap->{'state'} = 'quitting';
    }
  }
}

###############################################################################
# This is a single server session.  It is spawned by the daytime
# server to handle incoming connections.

#------------------------------------------------------------------------------
# This is a regular Perl sub that helps create new sessions.  It's not
# an event handler.

sub session_create {
  my ($handle, $peer_host, $peer_port) = @_;
                                        # create the session
  POE::Session->create(
    inline_states => {
      _start  => \&session_start,
      _stop   => \&session_stop,
      receive => \&session_receive,
      flushed => \&session_flushed,
      error   => \&session_error,
      signals => \&session_signals,
      _child  => sub {},
      _parent => sub {},
    },

    # ARG0, ARG1, ARG2
    args => [ $handle, $peer_host, $peer_port ]
  );
}

#------------------------------------------------------------------------------
# Accept POE's standard _start event, and start transacting with the
# client.

sub session_start {
  my ($kernel, $heap, $handle, $peer_host, $peer_port) =
    @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
                                        # make the address printable
  $peer_host = inet_ntoa($peer_host);
  DEBUG && print "Session with $peer_host $peer_port is starting.\n";
                                        # watch for SIGINT
  $kernel->sig('INT', 'signals');
                                        # record the client info for later
  $heap->{'host'} = $peer_host;
  $heap->{'port'} = $peer_port;
                                        # start reading and writing
  $heap->{'wheel'} = POE::Wheel::ReadWrite->new(
    Handle       => $handle,                 # on the client's socket
    Driver       => POE::Driver::SysRW->new, # using sysread and syswrite
    Filter       => POE::Filter::Line->new,  # and parsing I/O as lines
    InputEvent   => 'receive',               # generating this event on input
    ErrorEvent   => 'error',                 # generating this event on error
    FlushedEvent => 'flushed',               # generating this event on flush
  );
                                        # give the client the time of day
  $heap->{'wheel'}->put(
    "Hi, $peer_host $peer_port!  The time is: " . gmtime() . " GMT"
  );
}

#------------------------------------------------------------------------------
# Accept POE's standard _stop event.  This normally would clean up the
# session, but this program doesn't keep anything in the heap that
# needs to be cleaned up.

sub session_stop {
  my $heap = $_[HEAP];
  DEBUG && print "Session with $heap->{'host'} $heap->{'port'} has stopped.\n";
}

#------------------------------------------------------------------------------
# This state is invoked by the ReadWrite wheel whenever a complete
# request has been received.

sub session_receive {
  my ($heap, $line) = @_[HEAP, ARG0];
  DEBUG && print "Received from $heap->{'host'} $heap->{'port'}: $line\n";
}

#------------------------------------------------------------------------------
# This state is invoked when the ReadWrite wheel encounters an error.

sub session_error {
  my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
  DEBUG && print( "Session with $heap->{'host'} $heap->{'port'} ",
                  "encountered $operation error $errnum: $errstr\n"
                );
  delete $heap->{'wheel'};
}

#------------------------------------------------------------------------------
# This state is invoked when the ReadWrite wheel's output buffer
# becomes empty.  For a daytime server session, a flushed buffer means
# it's okay to close the connection.

sub session_flushed {
  my $heap = $_[HEAP];
  DEBUG && print "Output to $heap->{'host'} $heap->{'port'} has flushed.\n";
                                        # removing the wheel stops the session
  delete $heap->{'wheel'};
}

#------------------------------------------------------------------------------
# Catch and log signals, but never handle them.

sub session_signals {
  my ($heap, $signal_name) = @_[HEAP, ARG0];
  DEBUG && print( "Session with $heap->{'host'} $heap->{'port'} ",
                  "has received a SIG$signal_name\n"
                );
                                        # doesn't handle SIGINT, so it can stop
  return 0;
}

###############################################################################
# This is a generic daytime server.  Its only purpose is to listen on
# a socket, accept connections, and spawn daytime sessions to handle
# the connections.

#------------------------------------------------------------------------------
# This is a regular Perl sub that helps create new servers.  It's not
# an event handler.

sub server_create {
                                        # create the server
  POE::Session->create(
    inline_states => {
      _start         => \&server_start,
      _stop          => \&server_stop,
      accept_success => \&server_accept,
      accept_error   => \&server_error,
      signals        => \&server_signals,
      _child         => sub {},
      _parent        => sub {},
    }
  );
}

#------------------------------------------------------------------------------
# Accept POE's standard _start event.  Create a non-blocking server.

sub server_start {
  my ($kernel, $heap) = @_[KERNEL, HEAP];

  DEBUG && print "Daytime server is starting.\n";
                                        # set an alias so pool_stop can signal
  $kernel->alias_set('server');
                                        # watch for SIGINT and SIGQUIT
  $kernel->sig('INT', 'signals');
  $kernel->sig('QUIT', 'signals');
                                        # create a socket factory
  $heap->{'wheel'} = POE::Wheel::SocketFactory->new(
    BindAddress    => $server_addr,   # bind the listener to this address
    BindPort       => $server_port,   # bind the listener to this port
    Reuse          => 'yes',          # and reuse the socket right away
    SuccessEvent   => 'accept_success', # generate this event for connections
    FailureEvent   => 'accept_error',   # generate this event for errors
  );
}

#------------------------------------------------------------------------------
# Accept POE's standard _stop event.  This normally would clean up the
# session, but this program doesn't keep anything in the heap that
# needs to be cleaned up.

sub server_stop {
  my $heap = $_[HEAP];
  DEBUG && print "Daytime server has stopped.\n";
}

#------------------------------------------------------------------------------
# This state is invoked by the SocketFactory when an error occurs.

sub server_error {
  my ($operation, $errnum, $errstr) = @_[ARG0, ARG1, ARG2];
  DEBUG
    && print "Daytime server encountered $operation error $errnum: $errstr\n";
}

#------------------------------------------------------------------------------
# The SocketFactory invokes this state when a new client connection
# has been accepted.  The parameters include the client socket,
# address and port.

sub server_accept {
  my ($handle, $host, $port) = @_[ARG0, ARG1, ARG2];
                                        # spawn a server session
  die "possible filehandle leak" if fileno($handle) > 63;
  &session_create($handle, $host, $port);
}

#------------------------------------------------------------------------------
# Catch and log signals, but never handle them.

sub server_signals {
  my $signal_name = $_[ARG0];
  DEBUG && print "Daytime server caught SIG$signal_name\n";
                                        # doesn't handle SIGINT, so it can stop
  return 0;
}

###############################################################################
# Start the daytime server and a pool of clients to transact with it.

&server_create();
&pool_create();

$poe_kernel->run();

exit;