The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w
# $Id: 01_queues.t 18 2003-11-29 23:18:56Z rcaputo $

use strict;

sub POE::Kernel::ASSERT_DEFAULT () { 1 }
use POE;
use POE::Component::JobQueue;

sub DEBUG () { 0 };

$| = 1;
print "1..3\n";

#------------------------------------------------------------------------------
# A list of tasks to run.

my @active_tasks  = qw( 5 4 3 2 1 5 4 3 2 1 );
my @passive_tasks = @active_tasks;
my @tests_done;

my $active_test              = 1;
my $passive_test             = $active_test + scalar @passive_tasks;
my $active_simultaneous      = 0;
my $passive_simultaneous     = 0;
my $active_max_simultaneous  = 0;
my $passive_max_simultaneous = 0;
my $target_done_count        = @active_tasks + @passive_tasks;

#------------------------------------------------------------------------------

sub worker_start {
  my ($kernel, $heap, $postback, $test, $task, $a_or_p) =
    @_[KERNEL, HEAP, ARG0..ARG3];

  $heap->{test} = $test;
  $heap->{task} = $task;
  $heap->{aorp} = $a_or_p;
  $heap->{postback} = $postback;

  $kernel->delay( done => $task );

  DEBUG and warn "$a_or_p test $test started ($task)\n";

  if ($a_or_p eq 'active') {
    $active_simultaneous++;
    $active_max_simultaneous = $active_simultaneous
      if $active_simultaneous > $active_max_simultaneous;
  }
  else {
    $passive_simultaneous++;
    $passive_max_simultaneous = $passive_simultaneous
      if $passive_simultaneous > $passive_max_simultaneous;
  }
}

sub worker_done {
  my $heap = $_[HEAP];

  DEBUG and
    warn "$heap->{aorp} test $heap->{test} finished ($heap->{task})\n";

  push @tests_done, $heap->{test};
  if ($heap->{aorp} eq 'active') {
    $active_simultaneous--;
  }
  else {
    $passive_simultaneous--;
  }

  my $postback = delete $heap->{postback};
  if (ref $postback eq 'ARRAY') {
    my $session = $_[KERNEL]->alias_resolve($postback->[0]);
    if (defined $session) {
      $postback = $session->postback( $postback->[1], $heap->{task} );
    }
    else {
      $postback = sub { 1 };
    }
  }

  # Causes some evil recursion somewhere. :(
  # $postback->( $heap->{test}, $heap->{task} );
}

#------------------------------------------------------------------------------

sub spawn_worker {
  my ($outer_postback, $outer_test, $outer_task, $active_or_passive) = @_;

  POE::Session->create
    ( inline_states =>
      { _start => \&worker_start,
        done   => \&worker_done,
        _stop => sub {},
      },
      args => [ $outer_postback, $outer_test, $outer_task, $active_or_passive ]
    );
}

#------------------------------------------------------------------------------

sub passive_respondee_start {
  my $kernel = $_[KERNEL];
  $kernel->yield( 'flood_queue' );
}

sub passive_respondee_flood_queue {
  my $kernel = $_[KERNEL];
  foreach (@passive_tasks) {
    $kernel->post( passive => enqueue => response => $_ );
  }
  $kernel->yield( 'dummy' );
}

sub passive_respondee_response {
  my ($request, $response) = @_[ARG0, ARG1];
  my (@req_job) = @$request;
  my (@resp_answer) = @$response;
  DEBUG and warn "passive respondee got: (@req_job) = (@resp_answer)";
}

POE::Session->create
  ( inline_states =>
    { _start      => \&passive_respondee_start,
      flood_queue => \&passive_respondee_flood_queue,
      response    => \&passive_respondee_response,

      # quiets ASSERT_DEFAULT
      _stop       => sub {},
      dummy       => sub {},
    }
  );

#------------------------------------------------------------------------------

sub active_respondee_start {
  my $kernel = $_[KERNEL];
  $kernel->alias_set( 'respondee' );
}

sub active_respondee_response {
  my ($request, $response) = @_[ARG0, ARG1];
  my (@req_job) = @$request;
  my (@resp_answer) = @$response;
  DEBUG and warn "active respondee got: (@req_job) = (@resp_answer)";
}

POE::Session->create
  ( inline_states =>
    { _start   => \&active_respondee_start,
      response => \&active_respondee_response,

      # quiets ASSERT_DEFAULT
      _stop    => sub {},
    }
  );

#------------------------------------------------------------------------------

POE::Component::JobQueue->spawn
  ( Alias        => 'active',
    WorkerLimit  => 5,
    Worker       =>
    sub {
      my $metapostback = shift;
      my $task = shift @active_tasks;
      my $test = $active_test++;
      if (defined $task) {
        my $postback = $metapostback->($task);
        &spawn_worker( $postback, $test, $task, 'active' );
      }
    },

    Active =>
    { AckAlias => 'respondee',
      AckState => 'response',
    },
  );

POE::Component::JobQueue->spawn
  ( Alias       => 'passive',
    WorkerLimit => 5,
    Worker      =>
    sub {
      my ($postback, $task) = @_;
      my $test = $passive_test++;
      &spawn_worker($postback, $test, $task, 'passive') if defined $task;
    },

    Passive => { },
  );

# Run it all until done.
$poe_kernel->run();

# Figure out whether the tests worked.

print 'not ' unless $active_max_simultaneous == 5;
print "ok 1\n";

print 'not ' unless $passive_max_simultaneous == 5;
print "ok 2\n";

print 'not ' unless scalar(@tests_done) == $target_done_count;
print "ok 3\n";

exit;