@@ -1,5 +1,24 @@
Revision history for Perl extension POE::Component::IKC.
+0.2402 7 Jul 2014
+ - Don't test POD
+ - Better handling of EADDRINUSE on Win32
+ - Win32 doesn't have open "-|", skip that test for now
+
+0.2401 16 May 2014
+ - Fixed the code in thunking that depended on a non-released version of POE
+ - Use $Test::Builder::Level
+
+0.2400 16 May 2014
+ - Bumped version
+ - Copyright 2014
+ - Added Server/on_error
+ - Added error monitor
+ - create_ikc_* functions now output deprecation warnings
+ - Fixed a timing issue when changing filters on an IKC channel
+ - Drop all use of Data::Dumper
+ - Remove hard coded ports
+
0.2305 10 Feb 2013
- Forgot Devel::Size in PREREQs
@@ -1,13 +1,13 @@
package POE::Component::IKC::Channel;
############################################################
-# $Id: Channel.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: Channel.pm 1247 2014-07-07 09:06:34Z fil $
# Based on tests/refserver.perl
# Contributed by Artur Bergman <artur@vogon-solutions.com>
# Revised for 0.06 by Rocco Caputo <troc@netrus.net>
# Turned into a module by Philp Gwyn <fil@pied.nu>
#
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -22,7 +22,8 @@ use POE qw(Wheel::ListenAccept Wheel::ReadWrite Wheel::SocketFactory
);
use POE::Component::IKC::Responder;
use POE::Component::IKC::Protocol;
-use Data::Dumper;
+use POE::Component::IKC::Util;
+use Data::Dump qw( pp );
use Devel::Size qw( total_size );
# use Net::Gen ();
@@ -32,7 +33,7 @@ use Time::HiRes qw( gettimeofday tv_interval );
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(create_ikc_channel);
-$VERSION = "0.2305";
+$VERSION = "0.2402";
sub DEBUG () { 0 }
@@ -186,14 +187,13 @@ sub channel_start
$session->option(default=>1);
$heap->{on_connect}=$p->{on_connect} if ref($p->{on_connect});
+ $heap->{on_error}=$p->{on_error} if ref($p->{on_error});
$heap->{subscribe}=$p->{subscribe}
if ref($p->{subscribe}) and @{$p->{subscribe}};
unless($heap->{is_server}) {
if(ref($p->{serializers}) and @{$p->{serializers}}) {
$heap->{serializers}=$p->{serializers};
-# } else {
-# $heap->{serializers}=$p->{serializers};
}
DEBUG and
warn __PACKAGE__, " Serializers: ",
@@ -211,13 +211,16 @@ sub channel_start
_set_phase($kernel, $heap, '000');
}
+ # This shouldn't be necessary
+ POE::Component::IKC::Responder->spawn();
+
# Register this channel
my $ikc = eval { $kernel->alias_resolve( 'IKC' ) };
if( $ikc ) {
$kernel->call( $ikc, 'register_channel' );
}
else {
- warn __PACKAGE__, " has no IKC responder.";
+ POE::Component::IKC::Util::monitor_error( $heap, 'setup', 2, "No IKC responder" );
$kernel->yield( 'shutdown' );
}
return "channel-$session";
@@ -230,27 +233,46 @@ sub _negociation_done
DEBUG and
warn "$$: Negociation done ($heap->{kernel_name}<->$heap->{remote_kernel}).\n";
- # generate this event on input
- $heap->{'wheel_client'}->event(InputEvent => 'receive',
- FlushedEvent => 'flushed');
+ $heap->{finishing} = 1;
- unless($heap->{filter}) {
- DEBUG and warn "$$: We didn't negociate a freezer, using defaults\n";
- $heap->{filter}=POE::Filter::Reference->new();
+ _pause_wheel( $heap );
+ _register_remote( $kernel, $heap );
+
+ # now that we've registered the remote kernel, we will no longer trigger on_error
+ delete $heap->{on_error};
+
+ TIMING and channel_log( $heap, "negociated" );
+
+ T->point( 'IKC', 'nego done' );
+
+ # Now that we're set up properly
+ if($heap->{subscribe}) { # subscribe to wanted sessions
+ $kernel->call('IKC', 'subscribe', $heap->{subscribe}, 'done');
+ }
+ else {
+ # "fake" a completed subscription
+ $kernel->yield('done');
}
- # parsing I/O as references
- my $ft = $heap->{filter};
- DEBUG and warn "$$: Filter is now $ft";
- $heap->{wheel_client}->set_filter($ft);
- delete $heap->{filter};
- create_ikc_responder();
+ delete $heap->{finishing};
+
+ _change_wheel( $heap );
+ _resume_wheel( $heap );
+
+ _monitor_channel( $heap, 'ready' );
+
+ return;
+}
+
+sub _register_remote
+{
+ my( $kernel, $heap ) = @_;
# Register the foreign kernel with the responder
my $aliases=delete $heap->{remote_aliases};
push @$aliases, $heap->{temp_remote_kernel}
- if $heap->{temp_remote_kernel} and
- not grep {$_ eq $heap->{temp_remote_kernel}} @$aliases;
+ if $heap->{temp_remote_kernel} and
+ not grep {$_ eq $heap->{temp_remote_kernel}} @$aliases;
DEBUG and
warn "$$: Register remote as ", join ', ', @$aliases;
@@ -259,22 +281,69 @@ sub _negociation_done
# delete $heap->{remote_kernel};
$kernel->call('IKC', 'register', $heap->{remote_ID}, $aliases, $heap->{remote_pid});
- TIMING and channel_log( $heap, "negociated" );
-
- T->point( 'IKC', 'nego done' );
+ DEBUG and
+ warn "$$: Registered remotes";
+}
- # Now that we're set up properly
- if($heap->{subscribe}) { # subscribe to wanted sessions
- $kernel->call('IKC', 'subscribe', $heap->{subscribe}, 'done');
- }
- else {
- # "fake" a completed subscription
- $kernel->yield('done');
+
+sub _change_wheel
+{
+ my( $heap ) = @_;
+
+ DEBUG and
+ warn "$$: Changing the wheel events\n";
+ # generate this event on input
+ $heap->{'wheel_client'}->event( InputEvent => 'receive',
+ FlushedEvent => 'flushed'
+ );
+
+ unless($heap->{filter}) {
+ DEBUG and warn "$$: We didn't negociate a freezer, using defaults\n";
+ $heap->{filter}=POE::Filter::Reference->new();
}
- return;
+ DEBUG and
+ warn "$$: Changing the wheel filter\n";
+
+ # parsing I/O as references
+ my $ft = $heap->{filter};
+ DEBUG and warn "$$: Filter is now $ft";
+ $heap->{wheel_client}->set_filter($ft);
+ delete $heap->{filter};
+
+ DEBUG and
+ warn "$$: Changed the wheel filter\n";
}
+
+sub _pause_wheel
+{
+ my( $heap ) = @_;
+ DEBUG and
+ warn "$$: Pause wheel\n";
+ $heap->{'wheel_client'}->pause_input;
+}
+
+sub _resume_wheel
+{
+ my( $heap ) = @_;
+ DEBUG and
+ warn "$$: Resume wheel\n";
+ $heap->{'wheel_client'}->resume_input;
+}
+
+sub _monitor_channel
+{
+ my( $heap, $op ) = @_;
+ $poe_kernel->call( IKC => 'inform_monitors',
+ $heap->{remote_ID},
+ 'channel', $op, $poe_kernel->get_active_session->ID
+ );
+
+
+}
+
+
#----------------------------------------------------
# This is the subscription callback
sub channel_done
@@ -488,7 +557,7 @@ sub client_000
}
else {
- warn "Server sent '$line' during negociation phase 000\n";
+ warn "$$: Server sent '$line' during negociation phase 000\n";
# prod far side into saying something coherrent
$heap->{wheel_client}->put('NOT') unless $line eq 'NOT';
}
@@ -693,6 +762,11 @@ sub channel_error
my ($heap, $kernel, $operation, $errnum, $errstr) =
@_[HEAP, KERNEL, ARG0, ARG1, ARG2];
+ POE::Component::IKC::Util::monitor_error( $heap,
+ $operation, $errnum, $errstr,
+ ( $operation eq 'read' && $errnum == 0 )
+ );
+
if ($errnum) {
DEBUG &&
warn "$$: Channel encountered $operation error $errnum: $errstr\n";
@@ -718,10 +792,7 @@ sub _channel_unregister
------------------------------------------
WARN
# 2005/06 Tell IKC we closed the connection
- my $ikc = eval { $poe_kernel->alias_resolve( 'IKC' ) };
- if( $ikc ) {
- $poe_kernel->call( $ikc, 'unregister', $heap->{remote_ID});
- }
+ $poe_kernel->call( 'IKC', 'unregister', $heap->{remote_ID} );
delete $heap->{remote_ID};
}
# either way, shut down
@@ -731,17 +802,25 @@ WARN
sub _close_channel
{
my($heap, $force)=@_;
+
+
+ # we have to inform monitors before unregistering
+ # but we only want to inform once,
+ _monitor_channel( $heap, 'close' ) unless $heap->{inform_once}++;
+
# tell responder right away that this channel isn't to be used
_channel_unregister($heap);
return unless $heap->{wheel_client};
+
if(not $force and $heap->{wheel_client}->get_driver_out_octets) {
DEBUG and
warn "************ Defering wheel close";
$heap->{go_away}=1; # wait until next Flushed
return;
}
+
DEBUG and
warn "Deleting wheel session = ", $poe_kernel->get_active_session->ID;
my $x=delete $heap->{wheel_client};
@@ -803,9 +882,12 @@ sub channel_receive
{
my ($kernel, $heap, $request) = @_[KERNEL, HEAP, ARG0];
+ warn "$$: Attempting to receive during finishing" if $heap->{finishing};
+
T->point( 'IKC', 'receive' );
- TIMING and channel_log( $heap, "receive" );
+ TIMING and
+ channel_log( $heap, "receive" );
DEBUG &&
warn "$$: Received data...\n";
@@ -831,7 +913,10 @@ sub channel_send
{
my ($heap, $request)=@_[HEAP, ARG0];
- TIMING and channel_log( $heap, "send" );
+ die "Attempting to send during finishing" if $heap->{finishing};
+
+ TIMING and
+ channel_log( $heap, "send" );
my $size = total_size $request;
if( $size > 100*1024*1024 ) {
@@ -845,8 +930,6 @@ sub channel_send
if ref($request) and $request->{rsvp};
if($heap->{'wheel_client'}) {
- # use Data::Dumper;
- # warn "Sending ", Dumper $request;
$heap->{'wheel_client'}->put($request);
}
else {
@@ -857,7 +940,7 @@ sub channel_send
'ARRAY' eq ref $request->{params};
my $type = "missing";
$type = "shutdown" if $heap->{shutdown};
- warn "$$: Attempting to put to a $type channel! ". Dumper $what;
+ warn "$$: Attempting to put to a $type channel! ". pp $what;
}
T->point( 'IKC', 'send' );
@@ -944,15 +1027,18 @@ POE::Component::IKC::Channel - POE Inter-Kernel Communication I/O session
use POE;
use POE::Component::IKC::Channel;
- create_ikc_channel($handle, $name, $on_connect, $subscribe,
- $rname, $unix);
+
+ POE::Component::IKC::Channel->spawn( %params );
=head1 DESCRIPTION
-This module implements an POE IKC I/O.
-When a new connection
-is established, C<IKC::Server> and C<IKC::Client> create an
-C<IKC::Channel> to handle the I/O.
+You will never use an IKC Channel directly. They are created by
+L<POE::Component::IKC::Server> and L<POE::Component::IKC::Client> as needed.
+
+
+This module implements an POE IKC I/O. When a new connection is
+established, C<IKC::Server> and C<IKC::Client> create an C<IKC::Channel> to
+handle the I/O.
IKC communication happens in 2 phases : negociation phase and normal phase.
@@ -966,13 +1052,6 @@ the Responder.
C<IKC::Channel> is also in charge of cleaning up kernel names when
the foreign kernel disconnects.
-=head1 EXPORTED FUNCTIONS
-
-=head2 create_ikc_channel
-
-This function initiates all the work of connecting to a IKC connection
-channel. It is a wrapper around C<spawn>.
-
=head1 METHODS
=head2 spawn
@@ -1056,6 +1135,12 @@ Then, when it becomes time to disconnect:
Yes, this is a hack. A cleaner machanism needs to be provided.
+=head1 EXPORTED FUNCTIONS
+
+=head2 create_ikc_channel
+
+Deprecated.
+
=head1 BUGS
@@ -1065,7 +1150,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,13 +1,13 @@
package POE::Component::IKC::Client;
############################################################
-# $Id: Client.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: Client.pm 1247 2014-07-07 09:06:34Z fil $
# Based on refserver.perl
# Contributed by Artur Bergman <artur@vogon-solutions.com>
# Revised for 0.06 by Rocco Caputo <troc@netrus.net>
# Turned into a module by Philp Gwyn <fil@pied.nu>
#
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -18,13 +18,15 @@ use strict;
use Socket;
use vars qw($VERSION @ISA @EXPORT @EXPORT_OK);
use POE qw(Wheel::ListenAccept Wheel::SocketFactory);
+use POE::Component::IKC::Responder;
use POE::Component::IKC::Channel;
+use POE::Component::IKC::Util;
use Carp;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(create_ikc_client);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
@@ -43,6 +45,9 @@ sub create_ikc_client
sub spawn
{
+ POE::Component::IKC::Responder->spawn;
+
+
T->start( 'IKC' );
my( $package, %parms ) = @_;
$parms{package} ||= $package;
@@ -158,13 +163,14 @@ sub _start {
sub error
{
- my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
+ my ($heap, $kernel, $operation, $errnum, $errstr) = @_[HEAP, KERNEL, ARG0, ARG1, ARG2];
DEBUG and warn "Client encountered $operation error $errnum: $errstr\n";
my $w=delete $heap->{wheel};
# WORK AROUND
# $w->DESTROY;
- if($heap->{on_error}) {
- $heap->{on_error}->($operation, $errnum, $errstr);
+ POE::Component::IKC::Util::monitor_error( $heap, $operation, $errnum, $errstr);
+ if( $heap->{alias} ) {
+ $kernel->alias_remove( delete $heap->{alias} );
}
}
@@ -180,7 +186,7 @@ sub connected
T->point( IKC => 'connected' );
# give the connection to a Channel
my %p = ( handle=>$handle, addr=>$addr, port=>$port, client=>1 );
- my @list = qw(name on_connect subscribe remote_name wheel aliases unix
+ my @list = qw(name on_connect on_error subscribe remote_name wheel aliases unix
serializers protocol);
@p{@list} = @{$heap}{@list};
$p{rname} = delete $p{remote_name};
@@ -375,7 +381,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,10 +1,10 @@
package POE::Component::IKC::ClientLite;
############################################################
-# $Id: ClientLite.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: ClientLite.pm 1247 2014-07-07 09:06:34Z fil $
# By Philp Gwyn <fil@pied.nu>
#
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -26,22 +26,17 @@ use Carp;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(create_ikc_client);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
$request=0;
###############################################################################
-#----------------------------------------------------
-# This is just a convenient way to create servers. To be useful in
-# multi-server situations, it probably should accept a bind address
-# and port.
-sub create_ikc_client
+sub spawn
{
- my $package;
- $package = (scalar(@_) & 1 ? shift(@_) : __PACKAGE__);
- my(%parms)=@_;
+ my( $package, %parms ) = @_;
+
# $parms{on_connect}||=sub{}; # would be silly for this to be blank
$parms{ip}||='localhost';
$parms{port}||=603; # POE! (almost :)
@@ -71,7 +66,14 @@ sub create_ikc_client
$self->connect and return $self;
return;
}
-*spawn=\&create_ikc_client;
+
+sub create_ikc_client
+{
+ my(%parms)=@_;
+ my $package = $parms{package} || __PACKAGE__;
+ carp "create_ikc_client is deprecated; use $package->spawn instead";
+ $package->spawn( %parms );
+}
sub name { $_[0]->{name}; }
@@ -602,7 +604,7 @@ POE::Component::IKC::ClientLite - Small client for IKC
use POE::Component::IKC::ClientLite;
- $poe=create_ikc_client(port=>1337);
+ $poe = POE::Component::IKC::ClientLite->new(port=>1337);
die POE::Component::IKC::ClientLite::error() unless $poe;
$poe->post("Session/event", $param)
@@ -633,7 +635,9 @@ If it can't it returns an error. If it can, it will send he packet again. If
=head1 METHODS
-=head2 create_ikc_client
+=head2 spawn
+
+ my $poe = POE::Component::IKC::ClientLite->spawn( %params );
Creates a new PoCo::IKC::ClientLite object. Parameters are supposedly
compatible with PoCo::IKC::Client, but unix sockets aren't
@@ -765,13 +769,20 @@ Returns our local name. This is what the remote kernel thinks we are
called. I can't really say this is the local kernel name, because, well,
this isn't really a kernel. But hey.
+
+=head1 FUNCTIONS
+
+=head2 create_ikc_client
+
+DEPRECATED. Use L<POE::Compoent::IKC::ClientLite/spawn> instead.
+
=head1 AUTHOR
Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,8 +1,8 @@
package POE::Component::IKC::Freezer;
############################################################
-# $Id: Freezer.pm 1077 2013-02-11 16:50:56Z fil $
-# Copyright 2001-2011 Philip Gwyn. All rights reserved.
+# $Id: Freezer.pm 1247 2014-07-07 09:06:34Z fil $
+# Copyright 2001-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
@@ -15,7 +15,7 @@ use Carp;
require Exporter;
@ISA = qw(Exporter);
@EXPORT_OK = qw(freeze thaw dclone);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
@@ -69,7 +69,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 2001-2011 by Philip Gwyn. All rights reserved.
+Copyright 2001-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,8 +1,8 @@
package POE::Component::IKC::LocalKernel;
############################################################
-# $Id: LocalKernel.pm 801 2011-08-26 15:14:24Z fil $
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# $Id: LocalKernel.pm 1224 2014-05-15 18:49:21Z fil $
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -2,7 +2,7 @@ package POE::Component::IKC::Protocol;
############################################################
# $Id$
-# Copyright 2011 Philip Gwyn. All rights reserved.
+# Copyright 2011-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -1,9 +1,9 @@
-# $Id: Proxy.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: Proxy.pm 1247 2014-07-07 09:06:34Z fil $
package POE::Component::IKC::Proxy;
##############################################################################
-# $Id: Proxy.pm 1077 2013-02-11 16:50:56Z fil $
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# $Id: Proxy.pm 1247 2014-07-07 09:06:34Z fil $
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -21,7 +21,7 @@ use POE::Component::IKC::Specifier;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(create_ikc_proxy);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
@@ -67,9 +67,10 @@ sub _start
$heap->{callback}=[];
_add_callback($heap, $r_kernel, $r_session);
- DEBUG && warn "Proxy for $name ($r_session) created\n";
+ DEBUG && warn "$$: Proxy for $name ($r_session) created\n";
$kernel->alias_set($name);
- $kernel->alias_set($r_session);
+ $kernel->alias_set($r_session)
+ unless $kernel->alias_resolve( $r_session );
# monitor for shutdown events.
# this is the best way to get IKC::Responder to tell us about the
@@ -107,7 +108,7 @@ sub _delete
sub _stop
{
my($kernel, $heap)=@_[KERNEL, HEAP];
- DEBUG && warn "Proxy for $heap->{name} deleted\n";
+ DEBUG && warn "$$: Proxy for $heap->{name} deleted\n";
&{$heap->{monitor_stop}};
}
@@ -120,19 +121,19 @@ sub _default
return if $state =~ /^_/;
# use Data::Dumper;
- # warn "_default args=", Dumper $args;
+ # warn "$$: _default args=", Dumper $args;
if(not $heap->{callback})
{
- warn "Attempt to respond to a callback with $state\n";
+ warn "$$: Attempt to respond to a callback with $state\n";
return;
}
- DEBUG && warn "Proxy $heap->{name}/$state posted.\n";
+ DEBUG && warn "$$: Proxy $heap->{name}/$state posted.\n";
# use Data::Dumper;
- # warn "_default args=", Dumper $args;
+ # warn "$$: _default args=", Dumper $args;
my $ARG = [$state, [@$args]];
foreach my $r_state (@{$heap->{callback}}) {
- # warn "_default ARG=", Dumper $ARG;
+ # warn "$$: _default ARG=", Dumper $ARG;
$kernel->call('IKC', 'post2', $r_state, $sender, $ARG);
}
return;
@@ -159,7 +160,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,13 +1,13 @@
package POE::Component::IKC::Responder;
############################################################
-# $Id: Responder.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: Responder.pm 1248 2014-07-07 09:06:58Z fil $
# Based on tests/refserver.perl
# Contributed by Artur Bergman <artur@vogon-solutions.com>
# Revised for 0.06 by Rocco Caputo <troc@netrus.net>
# Turned into a module by Philp Gwyn <fil@pied.nu>
#
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -17,7 +17,7 @@ package POE::Component::IKC::Responder;
use strict;
use vars qw($VERSION @ISA @EXPORT @EXPORT_OK $ikc);
use Carp;
-use Data::Dumper;
+use Data::Dump qw( pp );
use POE qw(Session);
use POE::Component::IKC::Specifier;
@@ -27,7 +27,7 @@ use Scalar::Util qw(reftype);
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(create_ikc_responder $ikc);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
@@ -37,6 +37,7 @@ sub DEBUG { 0 }
# This is just a convenient way to create only one responder.
sub create_ikc_responder
{
+ carp "create_ikc_responder is deprecated. Use ", __PACKAGE__, "->spawn instead";
__PACKAGE__->spawn();
}
@@ -49,7 +50,7 @@ sub spawn
$package, [qw(
_start _stop _child
request post call raw_message post2
- remote_error
+ remote_error channel_error
register unregister register_local register_channel
default
publish retract subscribe unsubscribe
@@ -182,8 +183,6 @@ sub post
sub post2
{
my($heap, $to, $sender, $params) = @_[HEAP, ARG0, ARG1, ARG2];
- # use Data::Dumper;
- # warn "post2 params=", Dumper $params;
$heap->{self}->post($to, $params, $sender);
}
@@ -213,10 +212,20 @@ sub raw_message
sub remote_error
{
my($heap, $msg) = @_[HEAP, ARG0];
+ $heap->{self}->remote_error( $msg );
+}
- warn "$$: Remote error: $msg\n";
+#----------------------------------------------------
+# Local channel had an error
+sub channel_error
+{
+ my($heap, $msg) = @_[HEAP, ARG0];
+ return $heap->{self}->channel_error( $msg );
}
+
+
+
##############################################################################
# publish/retract/subscribe mechanism of setting up foreign sessions
@@ -255,9 +264,9 @@ sub subscribe
$sessions=[$sessions] unless ref $sessions;
return unless @$sessions;
+ $sender = $sender->ID if ref $sender;
if($callback and 'CODE' ne ref $callback)
{
- $sender = $sender->ID if ref $sender;
my $state=$callback;
$callback=sub
{
@@ -266,7 +275,7 @@ sub subscribe
$kernel->post($sender, $state, @_);
};
}
- $heap->{self}->subscribe($sessions, $callback, $sender->ID);
+ $heap->{self}->subscribe($sessions, $callback, $sender);
}
# Called by a foreign IKC session
@@ -280,19 +289,21 @@ sub do_you_have
my $self=$heap->{self};
DEBUG and
- warn "Wants to subscribe to ", specifier_name($ses), "\n";
+ warn "$$: Wants to subscribe to ", specifier_name($ses), "\n";
if(exists $self->{'local'}{$ses->{session}} and
(not $ses->{state} or
exists $self->{'local'}{$ses->{session}}{$ses->{state}}
))
{
$ses->{kernel}||=$kernel->ID; # make sure we uniquely identify
- DEBUG and warn "Allowed (we are $ses->{kernel})\n";
+ DEBUG and
+ warn "$$: Allowed (we are $ses->{kernel})\n";
return [$ses, $kernel->ID]; # this session
} else
{
- DEBUG and warn specifier_name($ses), " is not published in this kernel\n";
- return "NOT ".specifier_name($ses);
+ DEBUG and
+ warn "$$: ", specifier_name($ses), " is not published in this kernel\n";
+ return "Refused subscription ".specifier_name($ses);
}
}
@@ -347,7 +358,7 @@ use POE::Component::IKC::Proxy;
use POE::Component::IKC::LocalKernel;
use POE qw(Session);
-use Data::Dumper;
+use Data::Dump qw( pp );
sub DEBUG { 0 }
sub DEBUG2 { 0 }
@@ -410,6 +421,54 @@ sub shutdown
# warn Dump $kernel;
}
+
+#----------------------------------------------------
+# Error in a remote kernel
+sub remote_error
+{
+ my( $self, $msg ) = @_;
+ $self->_do_error( remote => $msg );
+}
+
+# Error in the local kernel
+sub local_error
+{
+ my( $self, $msg ) = @_;
+ $self->_do_error( local => $msg );
+}
+
+# Error in a local channel
+sub channel_error
+{
+ my( $self, $msg ) = @_;
+ return $self->_do_error( channel => $msg, 1 );
+}
+
+sub _do_error
+{
+ my( $self, $where, $msg, $ignore ) = @_;
+ my $n;
+ eval {
+ my $kernel = '*';
+ my $when = $where;
+ if( ref $msg ) {
+ ( $msg, $kernel, $when ) = @$msg;
+ $kernel ||= '*';
+ $when = "$where-$when" unless $when =~ /^$where-/;
+ }
+ $n = $self->inform_monitors( $kernel, 'error', $when, $msg );
+ };
+ return $n if $n;
+ warn "$$: \u$where error: ", pp( $msg ), "\n" unless $ignore;
+ return;
+}
+
+
+
+
+
+
+
#----------------------------------------------------
# Foreign kernel called something here
sub request
@@ -417,50 +476,22 @@ sub request
my($self, $request)=@_;;
my($kernel)=@{$self}{qw(poe_kernel)};
DEBUG2 and
- warn "IKC request=", Dumper $request;
+ warn "IKC request=", pp $request;
# We ignore the kernel for now, but we should really use it to decide
# weither we should run the request or not
+ my $when = 'request';
my $to=specifier_parse($request->{event});
-
+ my $rkernel = $to->{kernel};
eval {
die "$request->{event} isn't a valid specifier" unless $to;
- my $args=$request->{params};
-
- ### allow proxied states to have multiple ARGs
- if($to->{state} eq 'IKC:proxy') {
- $to->{state}=$args->[0];
- $args=$args->[1];
- DEBUG and warn "IKC proxied request for ", specifier_name($to), "\n";
- }
- else {
- DEBUG and warn "IKC request for ", specifier_name($to), "\n";
- $args=[$args];
- }
+ my $args = $self->_req_args( $request, $to );
# this is where we'd catch a disconnect message
# 2001/07 : eh?
-
- # find out if the state we want to get at has been published
- if(exists $self->{rsvp}{$to->{session}} and
- exists $self->{rsvp}{$to->{session}}{$to->{state}} and
- $self->{rsvp}{$to->{session}}{$to->{state}}
- ) {
- $self->{rsvp}{$to->{session}}{$to->{state}}--;
- DEBUG and warn "Allow $to->{session}/$to->{state} is now $self->{rsvp}{$to->{session}}{$to->{state}}\n";
- }
- elsif(not exists $self->{'local'}{$to->{session}}) {
- my $p=$self->published;
- die "Session '$to->{session}' is not available for remote kernels:",
- join "\n", '',
- map({ " $_=>[" . join(', ', @{$p->{$_}}) . "]"} keys %$p),
- '';
- }
- elsif(not exists $self->{'local'}{$to->{session}}{$to->{state}}) {
- die "Session '$to->{session}' has not published state '",
- $to->{state}, "'\n";
- }
+ $when = 'check';
+ $self->_req_is_published( $to );
# maybe caller specified #arg? This got into $msg->{rsvp}, which
# went to the remote side, then came back here as $to
@@ -468,9 +499,11 @@ sub request
push @$args, $to->{args}; # it goes on the end
}
+ $when = 'resolve';
my $session=$kernel->alias_resolve($to->{session});
- die "Unknown session '$to->{session}'\n" unless $session;
- # warn "No FROM" unless $request->{from};
+ die "Unknown session '$to->{session}'" unless $session;
+
+ $when = 'invocation';
_thunked_post($request->{rsvp}, ["$session", $to->{state}, @$args],
$request->{from}, $request->{wantarray});
};
@@ -479,26 +512,82 @@ sub request
# Error handling consists of posting a "remote_error" state to
# the foreign kernel.
# $request->{errors_to} is set by the local IKC::Channel
- if($@)
- {
- chomp($@);
- my $err=$@.' ['.specifier_name($to).']';
- $err.=' sent by ['.specifier_name($request->{from}).']'
- if $request->{from};
- warn "$err\n";
- DEBUG && warn "$$: Error in request: $err\n";
- unless($request->{is_error}) # don't send an error message back
- { # if this was an error itself
- $self->send_msg({ event=>$request->{errors_to},
- params=>$err, is_error=>1,
- });
- }
- else {
- warn $$, Dumper $request;
- }
+ if($@) {
+ $self->_error_response( $@, $request, $to, $rkernel, $when );
+ }
+}
+
+sub _req_args
+{
+ my( $self, $request, $to ) = @_;
+ my $args = $request->{params};
+
+ ### allow proxied states to have multiple ARGs
+ if($to->{state} eq 'IKC:proxy') {
+ $to->{state}=$args->[0];
+ $args=$args->[1];
+ DEBUG and warn "IKC proxied request for ", specifier_name($to), "\n";
+ }
+ else {
+ DEBUG and warn "IKC request for ", specifier_name($to), "\n";
+ $args=[$args];
+ }
+ return $args;
+}
+
+sub _req_is_published
+{
+ my( $self, $to ) = @_;
+
+ # find out if the state we want to get at has been published
+ if(exists $self->{rsvp}{$to->{session}} and
+ exists $self->{rsvp}{$to->{session}}{$to->{state}} and
+ $self->{rsvp}{$to->{session}}{$to->{state}}
+ ) {
+ $self->{rsvp}{$to->{session}}{$to->{state}}--;
+ DEBUG and warn "Allow $to->{session}/$to->{state} is now $self->{rsvp}{$to->{session}}{$to->{state}}\n";
+ }
+ elsif(not exists $self->{'local'}{$to->{session}}) {
+ my $p=$self->published;
+ DEBUG and
+ warn "$$: Available: ",
+ join "\n", '',
+ map({ " $_=>[" . join(', ', @{$p->{$_}}) . "]"} keys %$p),
+ '';
+ die "Session '$to->{session}' is not available for remote kernels\n",
+ }
+ elsif(not exists $self->{'local'}{$to->{session}}{$to->{state}}) {
+ die "Session '$to->{session}' has not published state '",
+ $to->{state}, "'\n";
}
+ return 1;
+}
+
+sub _error_response
+{
+ my( $self, $err, $request, $to, $rkernel, $when ) = @_;
+
+ chomp( $err );
+ $err=$err.'. Request '.specifier_name($to);
+ $err.=' sent by '.specifier_name($request->{from})
+ if $request->{from};
+
+ # Tell local sessions about this error
+ $self->local_error( [ $err, $rkernel, $when ] );
+
+ # never respond to an error message with an error message
+ return if $request->{is_error};
+
+ # Tell remote sessions about this error
+ $self->send_msg( { event=>$request->{errors_to},
+ params=>[ $err, $rkernel, $when ],
+ is_error=>1,
+ } );
}
+
+
+
#----------------------------------------------------
# Register foreign kernels so that we can send states to them
sub register
@@ -564,8 +653,6 @@ sub register_local
$self->{remote}{$rid}||=[]; # list of proxy sessions
$self->{alias}{$rid}||=[];
- # use Data::Dumper;
- # die Dumper $aliases;
foreach my $name (@$aliases) {
unless(defined $name) {
DEBUG and warn "$$: attempt to register undefined local kernel alias\n";
@@ -696,18 +783,18 @@ sub send_msg
my $to=specifier_parse($msg->{event});
unless($to) {
- die "Bad state ", Dumper $msg;
+ die "Bad state ", pp $msg;
}
unless($to) {
warn "Bad or missing 'event' parameter '$msg->{event}' to IKC/$e\n";
return;
}
unless($to->{session}) {
- warn "Need a session name for IKC/$e\n", Dumper $to;
+ warn "Need a session name for IKC/$e\n", pp $to;
return;
}
unless($to->{state}) {
- warn "Need a state name for IKC/$e\n", Dumper $to;
+ warn "Need a state name for IKC/$e\n", pp $to;
return;
}
@@ -755,13 +842,11 @@ sub send_msg
# Get a list of channels to send the message to
my @channels=$self->channel_list( $name );
- unless(@channels)
- {
- warn "$$: MSG TO ", Dumper $to;
- warn (($name eq '*')
- ? "$$: Not connected to any foreign kernels.\n"
- : "$$: Unknown kernel '$name'.\n");
- warn "$$: Known kernels: ". $self->channel_names;
+ unless(@channels) {
+ my $err = (($name eq '*')
+ ? "$$: Not connected to any foreign kernels."
+ : "$$: Unknown kernel '$name'.");
+ $self->inform_monitors( '*', 'error', 'resolve', $err );
return 0;
}
@@ -924,9 +1009,6 @@ sub post
my($self, $to, $params, $sender) = @_;
$to="poe://$to" unless ref $to or $to=~/^poe:/;
- # use Data::Dumper;
- # warn "params=", Dumper $params;
-
$self->send_msg({params=>$params, 'event'=>$to}, $sender);
}
@@ -952,7 +1034,7 @@ sub call
$rsvp=$t;
unless($rsvp->{state})
{
- DEBUG and warn Dumper $rsvp;
+ DEBUG and warn pp $rsvp;
warn "$$: rsvp state not set in poe:IKC/call\n";
return;
}
@@ -966,8 +1048,6 @@ sub call
}
DEBUG2 and warn "RSVP is ", specifier_name($rsvp), "\n";
- # use Data::Dumper;
- # warn "params=", Dumper $params;
$self->send_msg({params=>$params, 'event'=>$to,
rsvp=>$rsvp
}, $sender
@@ -1010,7 +1090,7 @@ sub publish
die "\$states isn't an array ref" unless ref($states) eq 'ARRAY';
foreach my $q (@$states) {
DEBUG and
- print STDERR "Published poe:$alias/$q\n";
+ print STDERR "$$: Published poe:$alias/$q\n";
$p->{$q}=1;
}
}
@@ -1112,8 +1192,14 @@ sub subscribe
rsvp=>{kernel=>$id, session=>'IKC', state=>$unique.$spec},
},
);
- # TODO What if this post failed? Session that posted this would
- # surely want to know
+ DEBUG and warn "$$: do_you_have sent to $count sessions\n";
+ if( $count == 0 ) {
+ # This post failed. Session that posted this would
+ # surely want to know
+ $self->inform_monitors( '*', 'error',
+ 'subscribe',
+ "Unknown kernel $ses->{kernel}" );
+ }
} else
{ # Bleh. User shouldn't be that dumb
die "You can't subscribe to a session within the current kernel.";
@@ -1181,8 +1267,11 @@ sub _subscribe_receipt
my $del;
if(not $ses or not ref $ses) { # REFUSED
- warn "$$: Refused to subscribe to $spec";
- warn "$$: $resp" if $resp;
+ $resp ||= "Refused subscription to $spec";
+ $self->inform_monitors( '*', 'error',
+ 'subscribe',
+ $resp )
+ or warn "$$: $resp";
$accepted=0;
$del=$unique.$spec;
}
@@ -1318,11 +1407,14 @@ sub inform_monitors
croak "$$: No kernel in $_[1]!" unless $rid;
my $real=1 if $self->{channel}{$rid};
- DEBUGM and do {
- warn "$$: inform $event $rid";
+ DEBUGM and
+ do {
+ warn "$$: inform $event $rid\n";
warn "$$: $rid is", ($real ? '' : "n't"), " real\n";
};
+ my $count = 0;
+
# got to be a better way of doing this...
my @todo=($rid);
push @todo, '*' unless $rid eq '*';
@@ -1351,15 +1443,17 @@ sub inform_monitors
# ARG4.... = per-message info
$kernel->post($sender, $e, $states->{__name}, $rid, $real,
$states->{data}, @params);
+ $count++;
}
}
# $rid might be an alias to something else, inform about those as well
if($self->{channel}{$rid}) {
foreach my $ra (@{$self->{alias}{$rid}}) {
- $self->inform_monitors($ra, $event, @params);
+ $count += $self->inform_monitors($ra, $event, @params);
}
}
+ return $count;
}
@@ -1383,7 +1477,7 @@ package POE::Component::IKC::Responder::Thunk;
use strict;
use Carp;
-use Data::Dumper;
+use Data::Dump qw( pp );
use POE::Component::IKC;
use POE::Session;
use POE;
@@ -1434,13 +1528,17 @@ sub DEBUG2 { 0 }
# the kernel. If the kernel changes, they will break.
# If they break, please contact gwyn-at-cpan.org.
# 2011/08 - These have been changed for 1.311
- if( $poe_kernel->_data_ses_exists( $current_thunk ) ) {
- my $count = $poe_kernel->_data_extref_count_ses( $current_thunk );
+ my $count = _ref_count( $current_thunk );
+ if( defined $count ) {
+ DEBUG and
+ warn "$$: $NAME count=$count\n";
if( 0==$count ) {
- DEBUG and warn "$$: $NAME reuse\n";
+ DEBUG and
+ warn "$$: $NAME reuse\n";
return 1;
}
- DEBUG and warn "$$: thunk count=$count\n";
+ DEBUG and
+ warn "$$: new thunk\n";
$poe_kernel->call( $current_thunk => '__active' );
}
undef( $current_thunk );
@@ -1448,6 +1546,22 @@ sub DEBUG2 { 0 }
}
}
+sub _ref_count
+{
+ my( $id ) = @_;
+ # This code is badly behaved!
+ return unless $poe_kernel->_data_ses_exists( $id );
+ if( $poe_kernel->can( '_data_extref_count_ses' ) ) {
+ return $poe_kernel->_data_extref_count_ses( $id )||0;
+ }
+ else {
+ # This is for the code that dngor had that extrefs as a sub object, not
+ # as a mixin'
+ $poe_kernel->[ POE::Kernel::KR_EXTRA_REFS() ]->count_session_refs( $id )||0;
+ }
+}
+
+
#----------------------------------------------------
sub _start
{
@@ -1483,7 +1597,7 @@ sub __thunk
# warn "no FROM" unless $from;
if($rsvp) { # foreign session wants returned value
- DEBUG2 and warn "Calling ", Dumper $call;
+ DEBUG2 and warn "Calling ", pp $call;
DEBUG2 and do { warn "Wants an array" if $wantarray};
@@ -1498,7 +1612,7 @@ sub __thunk
if($yes) {
DEBUG2 and do {
local $"=', ';
- warn "Posted response '@ret' to ", Dumper $rsvp;
+ warn "Posted response '@ret' to ", pp $rsvp;
};
# This is the POSTBACK
$POE::Component::IKC::Responder::ikc->send_msg(
@@ -1509,7 +1623,7 @@ sub __thunk
else {
# 2009/05 - use ->call() so that {from} can't be modified
# before refcount_increment is called
- DEBUG2 and warn "Posting ", Dumper $call;
+ DEBUG2 and warn "Posting ", pp $call;
$kernel->call(@$call);
}
}
@@ -1943,6 +2057,12 @@ The following states can be monitored:
=over 6
+=item C<channel>
+
+Called when a channel becomes ready or goes away. ARG3 is either C<ready>
+or C<close>. ARG4 is the numerical ID of the channel's session. See
+L</CHANNELS> below.
+
=item C<register>
Called when a remote kernel or alias is registered. This is equivalent to
@@ -1969,6 +2089,11 @@ You are informed whenever someone tries to do a sane shutdown of IKC and all
peripheral sessions. This will called only once, after somebody posts an
IKC/shutdown event.
+=item C<error>
+
+You are informed of errors in local and remote kernels. ARG3 is the operation that
+failed. ARG4 is the error message. See L</ERRORS> below.
+
=item C<data>
Little bit of data (can be scalar or reference) that is passed to the
@@ -2010,6 +2135,9 @@ Most of the time, ARG0 and ARG1 will be the same. Exceptions are if you are
monitoring C<*> or if you supplied a full IKC event specifier to
IKC/monitor rather then just a plain kernel name.
+
+
+
=head2 Short note about monitoring all kernels with C<*>
There are 2 reasons circonstances in which you will be monitoring all remote
@@ -2058,9 +2186,140 @@ Session is no longer monitoring all kernels, only 'Pulse'.
Now we aren't even interested in 'Pulse';
+=head1 CHANNELS
+
+Previous versions of IKC did not adequately allow you to control a connection.
+With 0.2400 we added a much needed feature.
+
+Each connection to a remote kernel is handled by a channel session. You
+find out the session's ID by monitoring for L</channel> operations. You may
+close a channel and the corresponding connection to the remote kernel by
+sending it a L</shutdown> event.
+
+ sub _start {
+ # set up the monitor
+ $poe_kernel->call( IKC => monitor => '*' => { channel => 'channel' } );
+ }
+
+ sub channel {
+ my( $self, $rid, $rkernel, $real, $data, $op, $channel ) = @_[ OBJECT, ARG0..$#_ ];
+ return unless $real; # only care about the real kernel ID
+ if( $op eq 'ready' ) { # new channel is ready
+ $self->{channel}{ $rkernel } = $channel;
+ }
+ elsif( $op eq 'close' ) { # channel is gone
+ delete $self->{channel}{ $rkernel };
+ }
+ }
+
+ # this an event posted from your controler logic
+ sub close_channel {
+ my( $self, $rkernel ) = @_[ OBJECT, ARG0 ];
+ # tell the channel to close
+ $poe_kernel->post( $self->{channel}{ $rkernel } => 'shutdown' );
+ }
+
+
+
+=head1 ERRORS
+Previous versions of IKC did not adequately allow you to monitor for errors
+on a connection. With 0.2400 we started monitoring errors.
+There are 2 step during which you can have errors: when opening the connection and
+during message exchange. These 2 steps are handled diffrently.
+You use L<POE::Component::IKC::Client/on_error> and
+L<POE::Component::IKC::Server/on_error> to receive errors while a connection
+is being opened. Note that this includes the initial IKC handshake.
+
+ sub on_error
+ {
+ my( $op, $errnum, $errstr ) = @_;
+ # Handle this like you would any POE socket error
+ # But remember you can't rely on your session being active
+ }
+
+
+You use L</monitor> on error to receive errors during message exchange. ARG3 is the
+name of the operation. ARG4 is the error message. Current operations are:
+
+=over 4
+
+=item remote-request
+
+Remote kernel was unable to parse a request that was sent from the local kernel.
+
+=item remote-check
+
+Remote kernel has not published an event that was sent from the local kernel.
+
+=item remote-resolve
+
+Remote kernel could not find a session that could handle the request.
+
+=item remote-invocation
+
+Remote kernel had an error when it tried to invoke the request handler.
+Please note this will not catch errors in the request handler, but only errors
+in the thunk.
+
+
+=item local-request
+
+=item local-check
+
+=item local-resolve
+
+=item local-invocation
+
+These 4 operations are the local equivalent of the previous 4. They are
+intented for logging. In general no actions are required.
+
+Note that 'local' and 'remote' refer to where the operation happened, not
+where the request originated. As an example, kernel A sends a
+poe://B/foo/bar request to kernel B. Kernel B has not published that event.
+Monitors on kernel A will see L<remote-check>. Monitors on kernel B will
+see L<local-check>.
+
+=item channel-error
+
+Receive channel errors during message exchange. Channel errors are
+equivalent to POE wheel errors. The message will be C<"[$errnum] $errstr">.
+
+=item subscribe
+
+Failure to subscribe to a remote session.
+
+=item fork
+
+L<POE::Component::IKC::Server> failed to fork.
+
+=item resolve
+
+Error when trying to find a remote kernel or session.
+
+=back
+
+Example monitor for error events:
+
+ sub monitor_error
+ {
+ my( $self, $rid, $kernel, $real, $data, $op, $message ) =
+ @_[ OBJECT, ARG0 ... $#_ ];
+ if( $op =~ /^channel-/ and $message =~ /\[(\d+)\] (.*)/ ) {
+ return unless $real;
+ my( $errnum, $errstr ) = ( $1, $2 );
+ if( $op eq 'channel-read' and $errnum == 0 ) {
+ warn "Connection closed";
+ return;
+ }
+ }
+ warn "Error during $op: $message";
+ }
+
+In particular, you will note we don't do anything when we detect the channel
+closed. Instead, it is recommended to attempt reconnection in the L</unregister> event.
@@ -2068,12 +2327,9 @@ Now we aren't even interested in 'Pulse';
=head2 C<create_ikc_responder>
-This function creates the Responder session and object. However, you don't
-need to call this directly, because L<POE::Component::IKC::Client> or
-L<POE::Component::IKC::Server> does
-this for you.
-
-Deprecated, use L</spawn>.
+DEPRECATED. Please use
+
+ POE::Compontent::IKC::Responder->spawn();
@@ -2121,7 +2377,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,13 +1,13 @@
package POE::Component::IKC::Server;
############################################################
-# $Id: Server.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: Server.pm 1247 2014-07-07 09:06:34Z fil $
# Based on refserver.perl and preforkedserver.perl
# Contributed by Artur Bergman <artur@vogon-solutions.com>
# Revised for 0.06 by Rocco Caputo <troc@netrus.net>
# Turned into a module by Philp Gwyn <fil@pied.nu>
#
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -17,25 +17,28 @@ package POE::Component::IKC::Server;
use strict;
use Socket;
use vars qw($VERSION @ISA @EXPORT @EXPORT_OK);
+use Carp;
use POE qw(Wheel::ListenAccept Wheel::SocketFactory);
use POE::Component::IKC::Channel;
use POE::Component::IKC::Responder;
+use POE::Component::IKC::Util;
use POSIX qw(:errno_h);
use POSIX qw(ECHILD EAGAIN WNOHANG);
+
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw(create_ikc_server);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
sub DEBUG_USR2 { 1 }
BEGIN {
# http://support.microsoft.com/support/kb/articles/Q150/5/37.asp
- eval '*WSAEAFNOSUPPORT = sub { 10047};';
- if($^O eq 'MSWin32') {
- eval '*EADDRINUSE = sub { 10048 };';
+ eval '*WSAEAFNOSUPPORT = sub { 10047 };';
+ if( $^O eq 'MSWin32' and not eval "EADDRINUSE" ) {
+ eval '*EADDRINUSE = sub { 10048 };';
}
}
@@ -79,6 +82,7 @@ sub create_ikc_server
{
my( %params )=@_;
$params{package} ||= __PACKAGE__;
+ carp "create_ikc_server is DEPRECATED. Please use $params{package}->spawn instead";
return $params{package}->spawn( %params );
}
@@ -168,6 +172,9 @@ sub _start
my $ret;
+ # This shouldn't be necessary
+ POE::Component::IKC::Responder->spawn;
+
# monitor for shutdown events.
# this is the best way to get IKC::Responder to tell us about the
# shutdown
@@ -198,6 +205,7 @@ sub _start
$heap->{kernel_aliases}=$params->{aliases};
$heap->{concurrency}=$params->{concurrency} || 0;
$heap->{protocol}=$params->{protocol};
+ $heap->{on_error}=$params->{on_error} if $params->{on_error};
# create a socket factory
$heap->{wheel} = new POE::Wheel::SocketFactory (%wheel_p);
if( $heap->{wheel} and not $params->{unix} and not $params->{port} ) {
@@ -538,21 +546,23 @@ sub shutdown
sub error
{
my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
- warn __PACKAGE__, " $$: encountered $operation error $errnum: $errstr\n";
+
+
+ DEBUG and
+ warn __PACKAGE__, " $$: encountered $operation error $errnum: $errstr\n";
+
my $ignore;
if($errnum==EADDRINUSE) { # EADDRINUSE
- warn "$$: IKC Address $heap->{wheel_address} in use\n";
$heap->{'die'}=1;
_delete_wheel( $heap );
- $ignore=1;
+ $ignore = 0;
} elsif($errnum==WSAEAFNOSUPPORT) {
# Address family not supported by protocol family.
# we get this error, yet nothing bad happens... oh well
$ignore=1;
}
unless($ignore) {
- # TODO : post to monitors
- warn __PACKAGE__, " $$: encountered $operation error $errnum: $errstr\n";
+ POE::Component::IKC::Util::monitor_error( $heap, $operation, $errnum, $errstr );
}
}
@@ -593,7 +603,8 @@ sub accept
name=>$heap->{name},
unix=>$heap->{unix},
aliases=>[@{$heap->{kernel_aliases}||[]}],
- protocol=>$heap->{protocol}
+ protocol=>$heap->{protocol},
+ on_error=>$heap->{on_error}
);
_concurrency_up($heap);
@@ -641,9 +652,8 @@ sub fork
$heap->{'failed forks'}++;
$kernel->delay('retry', 1);
}
- # fail permanently, if fatal
- else {
- warn "Can't fork: $!\n";
+ else { # fail permanently, if fatal
+ POE::Component::IKC::Util::monitor_error( $heap, 'fork', 0+$1, "$!" );
$kernel->yield('_stop');
}
return;
@@ -997,7 +1007,7 @@ identical.
=head2 C<create_ikc_server>
-Syntatic sugar for POE::Component::IKC::Server->spawn.
+Deprecated. Use L<POE::Component::IKC::Server/spawn>.
=head1 CLASS METHODS
@@ -1006,7 +1016,7 @@ Syntatic sugar for POE::Component::IKC::Server->spawn.
This methods initiates all the work of building the IKC server.
Parameters are :
-=over 3
+=over 4
=item C<ip>
@@ -1075,13 +1085,27 @@ pipe), they will not share the conncurrent connection count.
=item C<protocol>
-Which IKC negociation protocol to use. The original protocol (C<IKC>) was
-synchronous and slow. The new protocol (C<IKC0>) sends all information at
-once. IKC0 will degrade gracefully to IKC, if the client and server don't
-match.
+Which IKC negociation protocol to use. The original protocol (C<IKC>) had a
+slow synchronous handshake. The new protocol (C<IKC0>) sends all the
+handshake information at once. IKC0 will degrade gracefully to IKC, if the
+client and server don't match.
Default is IKC0.
+=item C<on_error>
+
+Coderef that is called for all errors. You could use this to monitor for
+problems when forking children or opening the socket. Parameters are
+C<$operation, $errnum and $errstr>, which correspond to
+POE::Wheel::SocketFactory's FailureEvent, which q.v.
+
+However, IKC/monitor provides a more powerful mechanism for detecting
+errors. See L<POE::Component::IKC::Responder>.
+
+Note, also, that the coderef will be executed from within an IKC session,
+NOT within your own session. This means that things like
+$poe_kernel->delay_set() won't do what you think they should.
+
=back
@@ -1109,7 +1133,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -1,9 +1,9 @@
package POE::Component::IKC::Specifier;
############################################################
-# $Id: Specifier.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: Specifier.pm 1247 2014-07-07 09:06:34Z fil $
#
-# Copyright 1999-2011 Philip Gwyn. All rights reserved.
+# Copyright 1999-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -18,7 +18,7 @@ use Carp;
require Exporter;
@ISA = qw(Exporter);
@EXPORT = qw( specifier_parse specifier_name specifier_part);
-$VERSION = '0.2305';
+$VERSION = '0.2402';
sub DEBUG { 0 }
@@ -204,7 +204,7 @@ Philip Gwyn, <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -2,7 +2,7 @@ package POE::Component::IKC::Timing;
############################################################
# $Id$
-# Copyright 2011 Philip Gwyn. All rights reserved.
+# Copyright 2011-2014 Philip Gwyn. All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the same terms as Perl itself.
#
@@ -0,0 +1,44 @@
+package POE::Component::IKC::Util;
+
+############################################################
+# $Id$
+# Copyright 2014 Philip Gwyn. All rights reserved.
+# This program is free software; you can redistribute it and/or modify
+# it under the same terms as Perl itself.
+#
+# Contributed portions of IKC may be copyright by their respective
+# contributors.
+
+#
+# Utility functions
+#
+
+
+
+use strict;
+use warnings;
+
+use POE;
+use Carp;
+
+sub monitor_error
+{
+ my( $heap, $operation, $errnum, $errstr, $ignore ) = @_;
+
+ if( $heap->{on_error} ) {
+ $heap->{on_error}->( $operation, $errnum, $errstr );
+ }
+ else {
+ $poe_kernel->call( IKC => 'channel_error',
+ [ "[$errnum] $errstr",
+ $heap->{remote_ID},
+ $operation
+ ] ) and return;
+ return if $ignore;
+ my( $source ) = caller;
+ carp "$$: $source $operation error: $errnum $errstr";
+ }
+}
+
+1;
+
@@ -1,9 +1,9 @@
package POE::Component::IKC;
-# $Id: IKC.pm 1077 2013-02-11 16:50:56Z fil $
+# $Id: IKC.pm 1247 2014-07-07 09:06:34Z fil $
use strict;
use vars qw( $VERSION );
-$VERSION='0.2305';
+$VERSION='0.2402';
# Force CPAN to see this
@@ -1,4 +1,4 @@
-l=head1 NAME
+=head1 NAME
POE::Component::IKC -- POE Inter-Kernel Communication
@@ -226,7 +226,7 @@ Philip Gwyn <perl-ikc at pied.nu>
=head1 COPYRIGHT AND LICENSE
-Copyright 1999-2011 by Philip Gwyn. All rights reserved.
+Copyright 1999-2014 by Philip Gwyn. All rights reserved.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
@@ -10,6 +10,7 @@ IKC/Freezer.pm
IKC/LocalKernel.pm
IKC/Timing.pm
IKC/Protocol.pm
+IKC/Util.pm
IKC.pod
IKC.pm
MANIFEST
@@ -33,6 +34,7 @@ t/30_local.t
t/31_concurrency.t
t/32_thunk.t
t/33_thunklite.t
+t/40_monitor.t
dev/client
dev/client2
dev/client3
@@ -51,5 +53,8 @@ dev/t_specifier
dev/test_delete
dev/uclient
dev/userver
+dev/README
+eg/lite-1
+eg/server-1
META.yml Module YAML meta-data (added by MakeMaker)
META.json Module JSON meta-data (added by MakeMaker)
@@ -4,7 +4,7 @@
"Philip Gwyn <gwyn -at- cpan.org>"
],
"dynamic_config" : 1,
- "generated_by" : "ExtUtils::MakeMaker version 6.62, CPAN::Meta::Converter version 2.112150",
+ "generated_by" : "ExtUtils::MakeMaker version 6.92, CPAN::Meta::Converter version 2.120351",
"license" : [
"perl_5"
],
@@ -22,12 +22,12 @@
"prereqs" : {
"build" : {
"requires" : {
- "ExtUtils::MakeMaker" : 0
+ "ExtUtils::MakeMaker" : "0"
}
},
"configure" : {
"requires" : {
- "ExtUtils::MakeMaker" : 0
+ "ExtUtils::MakeMaker" : "0"
}
},
"runtime" : {
@@ -41,5 +41,5 @@
}
},
"release_status" : "stable",
- "version" : "0.2305"
+ "version" : "0.2402"
}
@@ -3,24 +3,24 @@ abstract: 'Inter-Kernel Communication for POE'
author:
- 'Philip Gwyn <gwyn -at- cpan.org>'
build_requires:
- ExtUtils::MakeMaker: 0
+ ExtUtils::MakeMaker: '0'
configure_requires:
- ExtUtils::MakeMaker: 0
+ ExtUtils::MakeMaker: '0'
dynamic_config: 1
-generated_by: 'ExtUtils::MakeMaker version 6.62, CPAN::Meta::Converter version 2.112150'
+generated_by: 'ExtUtils::MakeMaker version 6.92, CPAN::Meta::Converter version 2.120351'
license: perl
meta-spec:
url: http://module-build.sourceforge.net/META-spec-v1.4.html
- version: 1.4
+ version: '1.4'
name: POE-Component-IKC
no_index:
directory:
- t
- inc
requires:
- Data::Dump: 1
- Devel::Size: 0.77
- POE: 1.311
- Scalar::Util: 1
- Test::More: 0.6
-version: 0.2305
+ Data::Dump: '1'
+ Devel::Size: '0.77'
+ POE: '1.311'
+ Scalar::Util: '1'
+ Test::More: '0.6'
+version: '0.2402'
@@ -20,10 +20,10 @@ the overhead :)
STABILITY
-I use IKC full time on my sites (for example, http://www.camelot.ca/) so I'd
-say it is "stable enough if you know what you're doing", even if it doesn't
-implement my full original vision. I use IKC::ClientLite in mod_perl to
-talk to POE-based "application servers".
-
+I used to use IKC full time on my sites. However, I've moved to exclusively
+developing Intranet applications. One of my main apps (http://www.quaero.ca/)
+has a POEx::HTTP::Server front end and talks IKC with an application server.
+This means IKC handles thousands of requests an hour. I trust it, but then I
+know what I'm doing.
-Philip Gwyn, <fil@pied.nu>
@@ -0,0 +1,2 @@
+These are programs used during initial developement. I have no idea if they
+still work. I do know they use obsolete things like 'on_connect'.
@@ -20,7 +20,9 @@ sub server_io
$|++;
print "Creating sessions...\n";
-create_ikc_client(
+POE::Component::IKC::Responder->spawn(); # make sure the Responder exists
+
+POE::Component::IKC::Client->spawn(
port=>31337,
name=>$NAME,
subscribe=>[qw(poe://*/timeserver)],
@@ -33,7 +35,6 @@ POE::Session->new(
$kernel->sig('USR1', 'hup');
$kernel->alias_set('me');
- create_ikc_responder(); # make sure the Responder exists
$kernel->post('IKC', 'publish', 'me', [qw(pulse)]);
$kernel->post('IKC', 'monitor', '*',
{register=>'remote_register', unregister=>'remote_unregister',
@@ -66,13 +66,13 @@ sub create_me
}
$|++;
-create_ikc_client(
+POE::Component::IKC::Client->spawn(
port=>31337,
name=>$name,
# subscribe=>[qw(poe://*/foo)],
on_connect=>sub
{
- create_ikc_client(
+ POE::Component::IKC::Client->spawn(
on_connect=>\&create_me,
port=>31337,
name=>$name,
@@ -48,7 +48,7 @@ sub create_me
}
$|++;
-create_ikc_client(
+POE::Component::IKC::Client->spawn(
port=>31337,
name=>$name,
subscribe=>[qw(poe://*/timeserver)],
@@ -85,7 +85,7 @@ sub create_me
}
$|++;
-create_ikc_client(
+POE::Component::IKC::Client->spawn(
ip=>'localhost',
port=>31336,
name=>$name,
@@ -48,7 +48,7 @@ sub create_me
}
$|++;
-create_ikc_client(
+POE::Component::IKC::ClientLite->spawn(
port=>31337,
name=>$name,
subscribe=>[qw(poe://*/timeserver)],
@@ -8,7 +8,7 @@ use POE::Component::IKC::ClientLite;
my $name="Client$$";
-my $remote=create_ikc_client(
+my $remote=POE::Component::IKC::ClientLite->spawn(
port=>21510,
name=>$name,
timeout=>5,
@@ -10,7 +10,7 @@ my @connections;
my $name=$ARGV[0]||0;
print "$$: Connecting $name\n";
-my $remote=create_ikc_client(
+my $remote=POE::Component::IKC::ClientLite->spawn(
port=>31337,
name=>"lclient2-$$-$name",
timeout=>5,
@@ -13,7 +13,7 @@ use POE::Component::IKC::Specifier;
# Every 10 seconds, a 'pulse' event is sent to connected sessions.
-create_ikc_server(
+POE::Component::IKC::Server->spawn(
port=>31337, # elite--
name=>'Pulse');
@@ -13,7 +13,7 @@ use POE::Component::IKC::Specifier;
# Every 10 seconds, a 'pulse' event is sent to connected sessions.
-create_ikc_server(
+POE::Component::IKC::Server->spawn(
port=>31336, # elite--
name=>'Pulse2');
@@ -13,7 +13,7 @@ use POE::Component::IKC::Specifier;
# Every 10 seconds, a 'pulse' event is sent to connected sessions.
-create_ikc_server(
+POE::Component::IKC::Server->spawn(
port=>31337,
name=>'Pulse',
processes=>5,
@@ -21,7 +21,7 @@ sub server_io
$|++;
print "Creating sessions...\n";
-create_ikc_client(
+POE::Component::IKC::Client->spawn(
unix=>($ENV{TMPDIR}||$ENV{TEMP}||'/tmp').'/userver',
name=>$NAME,
subscribe=>[qw(poe://shut-server/Goaway)],
@@ -13,7 +13,7 @@ use POE::Component::IKC::Specifier;
# Very simple server. all it does is shutdown when 'goaway' is posted
-create_ikc_server(
+POE::Component::IKC::Server->spawn(
unix=>($ENV{TMPDIR}||$ENV{TEMP}||'/tmp').'/userver',
name=>'shut-server');
@@ -20,7 +20,7 @@ sub server_io
$|++;
print "Creating sessions...\n";
-create_ikc_client(
+POE::Component::IKC::Client->spawn(
unix=>($ENV{TMPDIR}||$ENV{TEMP}||'/tmp').'/userver',
name=>$NAME,
subscribe=>[qw(poe://*/timeserver)],
@@ -13,7 +13,7 @@ use POE::Component::IKC::Specifier;
# Every 10 seconds, a 'pulse' event is sent to connected sessions.
-create_ikc_server(
+POE::Component::IKC::Server->spawn(
unix=>($ENV{TMPDIR}||$ENV{TEMP}||'/tmp').'/userver',
name=>'Pulse');
@@ -0,0 +1,15 @@
+#!/bin/perl
+
+use strict;
+use warnings;
+use Data::Denter;
+
+use POE::Component::IKC::ClientLite;
+my $poe = POE::Component::IKC::ClientLite->spawn(
+ port => 6666,
+# serialiser => 'POE::Component::IKC::Freezer'
+ );
+die POE::Component::IKC::ClientLite::error() unless $poe;
+my $ret = $poe->post_respond("Service/event", {Hi=>"There"});
+
+warn "ret=", Denter $ret;
\ No newline at end of file
@@ -0,0 +1,43 @@
+#!/usr/bin/perl -w
+use strict;
+
+use lib qw(blib/lib blib/arch);
+
+use POE qw(Session);
+use POE::Component::IKC::Server;
+use POE::Component::IKC::Specifier;
+
+POE::Component::IKC::Server->spawn(
+ port=>6666,
+ name=>'Pulse',
+ processes=>5,
+ babysit=>30,
+ verbose=>1,
+ connections=>3);
+
+POE::Session->create(
+ inline_states => {
+ _start => sub {
+ my($kernel, $heap, $arg)=@_[KERNEL, HEAP, ARG0];
+ $heap->{listeners}={};
+ $kernel->alias_set('Service');
+ $kernel->call('IKC', 'publish', 'Service',
+ [qw(event)]);
+ },
+ _stop => sub {
+ warn "$$: _stop";
+ },
+ event => sub {
+ my($kernel, $heap, $args)=@_[KERNEL, HEAP, ARG0];
+ my $p=$args->[0];
+ my $rsvp=$args->[1];
+ warn "event $p";
+ $kernel->post( IKC => 'post',
+ $rsvp, { hello => $p->{Hi} } );
+ },
+ }
+);
+
+print "$$: Running server...\n";
+$poe_kernel->run();
+print "$$: Server exited...\n";
@@ -2,18 +2,19 @@
use strict;
-use Test::More tests => 11;
+use Test::More tests => 12;
-use_ok( 'POE::Component::IKC' );
+use_ok( 'POE::Component::IKC' ) or die;
use_ok( 'POE::Component::IKC::Specifier' );
use_ok( 'POE::Component::IKC::ClientLite' );
use_ok( 'POE::Component::IKC::Freezer' );
use_ok( 'POE::Component::IKC::Proxy' );
-use_ok( 'POE::Component::IKC::Channel' );
+use_ok( 'POE::Component::IKC::Channel' ) or die;
use_ok( 'POE::Component::IKC::LocalKernel' );
-use_ok( 'POE::Component::IKC::Responder' );
+use_ok( 'POE::Component::IKC::Responder' ) or die;
use_ok( 'POE::Component::IKC::Server' );
use_ok( 'POE::Component::IKC::Timing' );
+use_ok( 'POE::Component::IKC::Util' );
package other;
::use_ok( 'POE::Component::IKC::Client' );
@@ -4,6 +4,8 @@ use strict;
use warnings;
use Test::More;
+plan skip_all => 'these tests are for authors only' unless
+ $ENV{AUTHOR_TESTING} or $ENV{RELEASE_TESTING};
eval "use Test::Pod::Coverage 1.00";
plan skip_all => "Test::Pod::Coverage 1.00 required for testing POD coverage" if $@;
@@ -11,7 +13,7 @@ plan tests => 4;
pod_coverage_ok(
"POE::Component::IKC::Responder",
{ also_private => [
- qr/^(DEBUG|do_you_have|inform_monitors|post2|raw_message|register_channel|remote_error|request|sig_INT)$/
+ qr/^(DEBUG|do_you_have|inform_monitors|post2|raw_message|register_channel|remote_error|request|sig_INT|channel_error)$/
],
},
"POE::Component::IKC::Responder, ignoring private functions",
@@ -18,7 +18,6 @@ my $Q=2;
my %OK;
my $WIN32=1 if $^O eq 'MSWin32';
-
DEBUG and print "Starting servers...\n";
# Note : IKC0 for Unix test and IKC for Inet test means we can test
@@ -9,7 +9,7 @@ use Test::More tests => 11;
use POE::Component::IKC::ClientLite;
use POE::Component::IKC::Server;
use POE::Component::IKC::Responder;
-use Data::Dumper;
+use Data::Dump qw( pp );
use POE qw(Kernel);
@@ -153,9 +153,7 @@ sub shutdown
my($kernel)=$_[KERNEL];
$kernel->alias_remove('test');
DEBUG and warn "Test server: shutdown\n";
-# use YAML qw(Dump);
-# use Data::Dumper;
-# warn Dumper $kernel;
+# warn pp $kernel;
}
###########################################################
sub fetchQ
@@ -23,8 +23,16 @@ print "ok 1\n";
my $Q=2;
sub DEBUG () {0}
+###########
+# Get a "random" port number
+use IO::Socket::INET;
+my $sock = IO::Socket::INET->new( LocalAddr => '127.0.0.1', Listen => 1, ReuseAddr => 1 );
+our $PORT = $sock->sockport;
+undef( $sock );
+
+
POE::Component::IKC::Server->spawn(
- port=>1338,
+ port=>$PORT,
name=>'Inet',
aliases=>[qw(Ikc)],
);
@@ -288,7 +288,7 @@ sub run
my $name = "\u$type$$".'Client';
DEBUG and warn "$$: Connect\n";
- my $poe=create_ikc_client(
+ my $poe= POE::Component::IKC::ClientLite->spawn(
port=>$port,
name=>$name,
);
@@ -179,6 +179,7 @@ sub post1
DEBUG and warn "Server: post1 $arg\n";
$heap->{sender} = $_[SENDER]->ID;
+ ::note( "sender=$heap->{sender}" );
$kernel->post( $_[SENDER], resp1 => $arg );
}
@@ -210,6 +211,7 @@ sub post3
::isnt( $sender->ID, $heap->{sender}, "New thunk" );
$heap->{sender2} = $sender->ID;
+ ::note( "sender2=$heap->{sender2}" );
$kernel->post($sender, resp3 => $arg);
}
@@ -281,7 +281,7 @@ sub run
my $name = "\u$type$$".'Client';
DEBUG and warn "$$: Connect\n";
- my $poe=create_ikc_client(
+ my $poe=POE::Component::IKC::ClientLite->spawn(
port=>$port,
name=>$name,
);
@@ -0,0 +1,430 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use Symbol;
+
+use Test::More;
+plan skip_all => 'This test fails on Win32' if $^O eq 'MSWin32';
+
+
+plan tests => 22;
+
+sub DEBUG () { 0 }
+
+use POE;
+
+use POE::Component::IKC::Server;
+use POE::Component::IKC::Client;
+use POE::Component::IKC::Responder;
+
+###########
+# Get a "random" port number
+use IO::Socket::INET;
+my $sock = IO::Socket::INET->new( LocalAddr => '127.0.0.1', Listen => 1, ReuseAddr => 1 );
+our $PORT = $sock->sockport;
+undef( $sock );
+
+##########################################################################
+POE::Component::IKC::Responder->spawn;
+
+my %tests = (
+ '_start' => 0,
+ 'Client started' => 0,
+ 'Registered Child' => 0,
+ 'Client registered server' => 0,
+ 'Client tried to talk to unknown kernel' => 0,
+ 'Client attempted bad subscription' => 0,
+ 'Client registered server' => 0,
+ 'Got a request' => 0,
+ 'Client posted an unpublished event' => 0,
+ 'Server refused subscription' => 0,
+ 'Client got remote-check error' => 0,
+ 'Client got our answer' => 0,
+ 'Client disconnect' => 0,
+ 'Unregistered Child' => 0,
+ 'Client is done' => 0,
+ 'Child exited' => 0,
+ '_stop' => 0,
+ 'Client subscribed to server' => 0,
+ 'Channel close' => 0,
+ 'Channel ready' => 0
+);
+
+
+my $child = gensym;
+my $pid = open( $child, "-|" );
+die "Unable to fork: $pid" unless defined $pid;
+
+if( $pid ) { # parent
+ Test::Parent->spawn( $child, $pid, \%tests );
+}
+else {
+ sleep 1;
+ Test::Child->spawn;
+}
+
+
+pass( "Running" ) if $pid;
+$poe_kernel->run;
+exit 0 unless $pid;
+
+
+pass( "Sane exit" );
+
+foreach my $test ( keys %tests ) {
+ next if $tests{ $test } == 1;
+ next if $tests{ $test } == 2;
+ fail( "Never saw $test ($tests{$test})" );
+}
+
+
+##########################################################################
+package Test::Parent;
+
+use strict;
+use warnings;
+
+use POE;
+use POE::Wheel::ReadWrite;
+
+use Data::Dump qw( pp );
+
+sub DEBUG () { ::DEBUG }
+
+
+sub pass
+{
+ my( $self, $test ) = @_;
+ die "Unknown test '$test'" unless exists $self->{tests}{ $test };
+ $self->{tests}{ $test } ++;
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+ Test::More::pass( $test );
+ return 1;
+}
+
+sub fail
+{
+ my( $self, $test ) = @_;
+ die "Unknown test '$test'" unless exists $self->{tests}{ $test };
+ $self->{tests}{ $test } ++;
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+ Test::More::fail( $test );
+ return 0;
+}
+
+sub is
+{
+ my( $self, $is, $want, $test ) = @_;
+ $self->ok( $is eq $want, $test )
+ or Test::More::diag( " want: '$want'\n got: '$is'" );
+}
+
+sub like
+{
+ my( $self, $is, $re, $test ) = @_;
+ $self->ok( scalar ($is =~ /$re/), $test )
+ or Test::More::diag( " want: $re\n got: '$is'" );
+}
+
+sub ok
+{
+ my( $self, $ok, $test ) = @_;
+ if( $ok ) {
+ $self->pass( $test );
+ return 1;
+ }
+ else {
+ $self->fail( $test );
+ return 0;
+ }
+}
+
+sub spawn
+{
+ my( $package, $child, $pid, $tests ) = @_;
+ my $self = bless { child => $child,
+ pid => $pid,
+ tests => $tests }, $package;
+ POE::Session->create(
+ object_states => [
+ $self => [ qw( _start _stop register unregister
+ child_error parent_error channel
+ req child
+ sig_CHLD
+ ) ]
+ ] );
+
+
+ POE::Component::IKC::Server->spawn( ip => '127.0.0.1',
+ port => $::PORT,
+ name => 'Parent',
+ on_error => sub { $self->on_error( @_ ) }
+ );
+
+}
+
+######################################
+sub _start
+{
+ my( $self ) = @_;
+
+ $self->pass( "_start" );
+ $poe_kernel->alias_set( 'parent' );
+ $poe_kernel->post( IKC => monitor => Child =>
+ { register => 'register',
+ unregister => 'unregister',
+ error => 'child_error',
+ } );
+ $self->{wheel} = POE::Wheel::ReadWrite->new(
+ Handle => $self->{child},
+ InputEvent => 'child'
+ );
+ $poe_kernel->post( IKC => publish => parent => [ qw( req ) ] );
+ $poe_kernel->post( IKC => monitor => '*' =>
+ { channel => 'channel',
+ } );
+
+ $poe_kernel->post( IKC => monitor => Parent =>
+ { error => 'parent_error',
+ } );
+ $poe_kernel->sig_child( $self->{pid}, 'sig_CHLD' );
+}
+
+######################################
+sub _stop
+{
+ my( $self ) = @_;
+ $self->pass( "_stop" );
+}
+
+######################################
+sub register
+{
+ my( $self, $rid, $kernel, $real ) = @_[ OBJECT, ARG0..$#_ ];
+
+ $self->is( $kernel, 'Child', "Registered Child" );
+}
+
+######################################
+sub unregister
+{
+ my( $self, $rid, $kernel, $real ) = @_[ OBJECT, ARG0..$#_ ];
+ $self->is( $kernel, 'Child', "Unregistered Child" );
+ $poe_kernel->post( IKC => 'shutdown' );
+}
+
+######################################
+sub child_error
+{
+ my( $self, $rid, $kernel, $real, $data, $op, $msg ) = @_[ OBJECT, ARG0..$#_ ];
+ if( $op eq 'channel-read' ) {
+ $self->like( $msg, qr/\[(0|104)\] /, "Client disconnect" );
+ }
+ else {
+ die "$op $msg";
+ }
+}
+
+######################################
+sub parent_error
+{
+ my( $self, $rid, $kernel, $real, $data, $op, $msg ) = @_[ OBJECT, ARG0..$#_ ];
+ if( $op eq 'local-check' ) {
+ $self->like( $msg, qr"poe://Parent/not/there", "Client posted an unpublished event" );
+ }
+ else {
+ die "$op $msg";
+ }
+}
+
+######################################
+sub child
+{
+ my( $self, $line, $wid ) = @_[ OBJECT, ARG0..$#_ ];
+
+ if( $line =~ /child: _start/ ) {
+ $self->pass( "Client started" );
+ }
+ elsif( $line =~ /child: register/ ) {
+ $self->like( $line, qr/Parent/, "Client registered server" );
+ }
+ elsif( $line =~ /child: resolve/ ) {
+ $self->like( $line, qr/Unknown kernel 'Unknown'/, "Client tried to talk to unknown kernel" );
+ }
+ elsif( $line =~ /child: subscribe/ ) {
+ if( $line =~ /Unknown/ ) {
+ $self->like( $line, qr/Unknown kernel Unknown/, "Client attempted bad subscription" );
+ }
+ elsif( $line =~ /Refused/ ) {
+ $self->like( $line, qr/Refused subscription/, "Server refused subscription" );
+ }
+ else {
+ $self->like( $line, qr/kernel => .Parent./, "Client subscribed to server" );
+ }
+ }
+ elsif( $line =~ /child: remote-check Session 'not'/ ) {
+ $self->pass( "Client got remote-check error" );
+ }
+ elsif( $line =~ /child: answer 17/ ) {
+ $self->pass( "Client got our answer" );
+ }
+ elsif( $line =~ /child: _stop/ ) {
+ $self->pass( "Client is done" );
+ }
+ else {
+ Test::More::diag( $line );
+ }
+}
+
+######################################
+sub sig_CHLD
+{
+ my( $self ) = @_;
+ $self->pass( "Child exited" );
+ delete $self->{wheel};
+}
+
+######################################
+sub req
+{
+ my( $self, $args ) = @_[ OBJECT, ARG0..$#_ ];
+ $self->ok( $args->{resp}, "Got a request" );
+ DEBUG and Test::More::diag( pp $args );
+ $poe_kernel->post( IKC => post => $args->{resp}, 17 );
+}
+
+######################################
+sub on_error
+{
+ my( $self, $op, $errnum, $errstr ) = @_;
+ die "$op: $errnum $errstr";
+}
+
+
+######################################
+sub channel
+{
+ my( $self, $rid, $kernel, $real, $data, $op, $channel ) = @_[ OBJECT, ARG0..$#_ ];
+ return unless $real;
+ # $channel is a session ID
+ $self->like( $channel, qr/^\d+/, "Channel $op" );
+}
+
+##########################################################################
+package Test::Child;
+
+use strict;
+use warnings;
+
+use POE;
+use POE::Wheel::ReadWrite;
+
+sub DEBUG () { ::DEBUG }
+
+use Data::Dump qw( pp );
+
+sub spawn
+{
+ my( $package ) = @_;
+ $|++;
+ my $self = bless { }, $package;
+ POE::Session->create(
+ object_states => [
+ $self => [ qw( _start _stop register unregister error subscribe
+ resp
+ ) ]
+ ] );
+
+ POE::Component::IKC::Client->spawn( ip => '127.0.0.1',
+ port => $::PORT,
+ name => 'Child',
+ on_error => sub { $self->on_error( @_ ) }
+ );
+
+}
+
+######################################
+sub _start
+{
+ my( $self ) = @_;
+ print "child: _start $$\n";
+ $poe_kernel->alias_set( 'child' );
+ $poe_kernel->post( IKC => monitor => Parent =>
+ { register => 'register',
+ unregister => 'unregister',
+ subscribe => 'subscribe',
+ error => 'error',
+ } );
+ $poe_kernel->post( IKC => monitor => '*' =>
+ {
+ error => 'error',
+ } );
+ $poe_kernel->post( IKC => publish => child => [ qw( resp ) ] );
+}
+
+######################################
+sub _stop
+{
+ my( $self ) = @_;
+ print "child: _stop\n";
+}
+
+######################################
+sub register
+{
+ my( $self, $rid, $kernel, $real ) = @_[ OBJECT, ARG0..$#_ ];
+ print "child: register $kernel\n";
+ $poe_kernel->call( IKC => subscribe => [ "poe://Parent/parent",
+ "poe://Parent/unknown",
+ "poe://Unknown/unknown"
+ ] );
+}
+
+######################################
+sub unregister
+{
+ my( $self, $rid, $kernel, $real ) = @_;
+ print "child: unregister $kernel\n";
+ $poe_kernel->alias_remove( 'child' );
+}
+
+######################################
+sub error
+{
+ my( $self, $rid, $kernel, $real, $data, $op, $msg ) = @_[ OBJECT, ARG0..$#_ ];
+ die "$op has newline" if $msg =~ /\n$/;
+ # warn "$$: $op $msg for rid=$rid kernel=$kernel";
+ print "child: $op $msg\n";
+}
+
+
+######################################
+sub subscribe
+{
+ my( $self, $rid, $kernel, $real, $data, $what ) = @_[ OBJECT, ARG0 .. $#_ ];
+ print "child: subscribe ", pp( $what ), "\n";
+ # send a bad request
+ $poe_kernel->post( IKC => 'post',
+ 'poe://Parent/not/there',
+ { resp => 'poe://Child/child/resp' } );
+ # send a good request
+ $poe_kernel->post( 'poe://Parent/parent', req =>
+ { resp => 'poe://Child/child/resp' } );
+}
+
+######################################
+sub resp
+{
+ my( $self, $answer ) = @_[ OBJECT, ARG0 .. $#_ ];
+ print "child: answer $answer\n";
+ $poe_kernel->post( IKC => 'shutdown' );
+}
+
+######################################
+sub on_error
+{
+ my( $self, $op, $errnum, $errstr ) = @_;
+ die "$op: $errnum $errstr";
+}
@@ -9,7 +9,7 @@ my $name = shift || 'LiteClient';
my $norecon = shift || 0;
DEBUG and warn "$$: Connect\n";
-my $poe=create_ikc_client(
+my $poe=POE::Component::IKC::ClientLite->spawn(
port=>$port,
name=>$name,
protocol=>'IKC0'