The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package IPC::MPS::Event;

use strict;
use warnings;

use Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node);

our $VERSION = '0.13';

use Carp;
use Event;
use IO::Socket;
use Scalar::Util qw(refaddr);
use Storable qw(freeze thaw);


my $DEBUG = 0;
$DEBUG and require Data::Dumper;

my @spawn            = ();
my %msg              = ();
my %fh2vpid          = ();
my %vpid2fh          = ();
my %fh2fh            = ();
my $self_vpid        = 0;
my $self_parent_fh;
my $self_parent_vpid = 0;
my %listener         = ();
my %node             = ();
my %snd              = ();
my $ipc_loop         = 0; 

my @rcv    = ();
my %r_bufs = ();
my %w_bufs = ();

my %pack   = ();
my %unpack = ();

my %fh2ww = ();

my ($waited_vpid, $waited_msg, @waited_rv);

my $blksize = 1024 * 16;


END {
	$ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
	close $_ foreach values %fh2fh;
}

sub spawn(&) {
	my ($spawn) = @_;
	socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!";
	my $vpid = refaddr $child;
	push @spawn, [$vpid, $child, $parent, $spawn];
	return $vpid;
}


sub msg($$) {
	my ($msg, $sub) = @_;
	$msg{$msg} = $sub;
}


sub snd($$;@) {
	my ($vpid, $msg, @args) = @_;
	defined $vpid or carp("Argument vpid required"), return;
	defined $msg  or carp("Argument msg required"),  return;
	$vpid = $self_parent_vpid if $vpid == 0;
	$DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
	push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
	w_event_cb_reg($vpid);
	return 1;
}


sub quit() { Event::unloop; $ipc_loop = 0 }


sub snd_wt($$;@) {
	my ($vpid, $msg, @args) = @_;
	defined $vpid or carp("Argument vpid required"), return;
	defined $msg  or carp("Argument msg required"),  return;
	snd($vpid, $msg, @args);
	wt($vpid, $msg);
}


sub listener($$;%) {
	my ($host, $port, %args) = @_;
	defined $host or carp("Argument host required"), return;
	defined $port or carp("Argument port required"), return;
	my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
	if ($sock) {
		_pack_unpack($sock, %args) or return;
		$listener{$sock} = $sock;
		Event->io(fd => $sock, poll => "r", cb => sub {
			my ($event) = @_;
			my $fh = $event->w->fd;
			$DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
			my $sock = $fh->accept;
			$pack{$sock}   = $pack{$fh};
			$unpack{$sock} = $unpack{$fh};
			$sock->sockopt(SO_KEEPALIVE, 1);
			my $vpid = refaddr $sock;
			$node{$sock}     = $vpid;
			$fh2vpid{$sock}  = $vpid;
			$vpid2fh{$vpid}  = $sock;
			$fh2fh{$sock}    = $sock;
			Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
		});
		return $sock;
	} else {
		carp "Cannot open socket '$host:$port' in $self_vpid: $!";
		return;
	}
}


sub open_node($$;%) {
	my ($host, $port, %args) = @_;
	defined $host or carp("Argument host required"), return;
	defined $port or carp("Argument port required"), return;
	my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
    my $addr = sockaddr_in($port,inet_aton($host));
	$sock->sockopt(SO_KEEPALIVE, 1);
    my $rv = $sock->connect($addr);
	if ($rv) {
		_pack_unpack($sock, %args) or return;
		my $vpid = refaddr $sock;
		$node{$sock}     = $vpid;
		$fh2vpid{$sock}  = $vpid;
		$vpid2fh{$vpid}  = $sock;
		$fh2fh{$sock}    = $sock;
		Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
		return $vpid;
	} else {
		carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
		return;
	}
}


sub _pack_unpack($%) {
	my ($fh, %args) = @_;
	if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
		my $r = eval {
			my $r = $unpack->($pack->({a => ["b"]}));
			if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
				$$r{a}[0] and $$r{a}[0] eq "b")
			{
				return 1;
			} else {
				return 0;
			}
		};
		if (not $r or $@) {
			carp "False pack unpack test";
			return;
		}
		$pack{$fh}   = $pack;
		$unpack{$fh} = $unpack;
	} elsif ($args{pack} or $args{unpack}) {
		carp "pack and unpack is pair options";
		return;
	}
	return 1;
}


sub receive(&) {
	my ($receive) = @_;

	$DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";

	local $SIG{CHLD} = "IGNORE";
	local $SIG{PIPE} = "IGNORE";

	foreach (@spawn) {
		my ($vpid, $child, $parent, $spawn) = @$_;
		
		my $kid_pid = fork;
		defined $kid_pid or die "Can't fork: $!";

		unless ($kid_pid) {
			
			foreach (@spawn) {
				close $$_[1];
				close $$_[2] if $$_[2] ne $parent;
			}

			close $_ foreach values %fh2fh, values %listener;
			$_->cancel foreach Event::all_watchers();
			@spawn    = ();
			%listener = ();
			%node     = ();
			%msg      = ();
			%fh2vpid  = ();
			%vpid2fh  = ();
			%fh2fh    = ();
			%snd      = ();

			$ipc_loop = 0;

			@rcv    = ();
			%r_bufs = ();
			%w_bufs = ();

			%pack   = ();
			%unpack = ();

			%fh2ww = (); 

			($waited_vpid, $waited_msg, @waited_rv) = ();

			$self_parent_fh   = $parent;
			$self_parent_vpid = $self_vpid;

			$self_vpid        = $vpid;

			$fh2vpid{$self_parent_fh}   = $self_parent_vpid;
			$vpid2fh{$self_parent_vpid} = $self_parent_fh;
			$fh2fh{$self_parent_fh}     = $self_parent_fh;

			Event->io(fd => $self_parent_fh, poll => "r", cb => \&r_event_cb);

			$spawn->();

			exit;
		}
	}


	foreach (@spawn) {
		my ($vpid, $child, $parent, $spawn, $receive) = @$_;
		close $parent;
		$fh2vpid{$child} = $vpid;
		$vpid2fh{$vpid}  = $child;
		$fh2fh{$child}   = $child;
		Event->io(fd => $child, poll => "r", cb => \&r_event_cb);
	}
	@spawn = ();



	$receive->();



	unless ($ipc_loop) {
		$ipc_loop = 1;
		w_event_cb_reg();
		Event::loop();
	}
}


sub wt($$) {
	($waited_vpid, $waited_msg) = @_;
	defined $waited_vpid or carp("Argument vpid required"), return;
	defined $waited_msg  or carp("Argument msg required"),  return;
	$waited_vpid = $self_parent_vpid if $waited_vpid == 0;
	foreach my $i (0 .. $#rcv) {
		my ($from, $msg, $args)= @{$rcv[$i]};
		if ($from eq $waited_vpid and $msg eq $waited_msg) {
			splice @rcv, $i, 1;
			return wantarray ? @$args : $$args[0];
		}
	}
	$DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
	w_event_cb_reg();
	Event::loop();
	my @rv = @waited_rv;
	($waited_vpid, $waited_msg, @waited_rv) = ();
	return wantarray ? @rv : $rv[0];
}


sub w_event_cb_reg {
	my ($to_vpid) = @_;

		foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
			if (@{$snd{$to}}) {
				my $fh = $vpid2fh{$to};
				unless ($fh) {
					if (@spawn) {
						carp "Probably have forgotten to call receive." if not defined $to_vpid;
						next;
					} else {
						if ($self_parent_fh) {
							$fh = $self_parent_fh;
						} else {
							carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
							next;
						}
					}
				}
				unless (exists $w_bufs{$fh}) {
					my $packet;
					if (my $pack = $pack{$fh}) {
						$packet = $pack->(shift @{$snd{$to}});
					} else {
						$packet = freeze  shift @{$snd{$to}};
					}
					my $buf = join "", pack("N", length $packet), $packet;
					$w_bufs{$fh} = $buf;
					$DEBUG and (@{$snd{$to}} or delete $snd{$to});
					$fh2ww{$fh} = Event->io(fd => $fh, poll => "w", cb => \&w_event_cb);
				}
			}
		}
}




sub r_event_cb {
	my ($event) = @_;
	my $fh = $event->w->fd;

	$DEBUG > 1 and print "Read event from $self_vpid: \n";

			my $len = sysread $fh, (my $_buf), $blksize;
 			if ($len) {
				$r_bufs{$fh} .= $_buf;
				NEXT_MSG: {
					my $buf = $r_bufs{$fh};
					if (length $buf >= 4) {
						my $packet_length = unpack "N", substr $buf, 0, 4, "";
						if (length $buf >= $packet_length) {
							my $packet = substr $buf, 0, $packet_length, "";
							$r_bufs{$fh} = $buf || "";
							$DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});

							my ($from, $to, $msg, $args);
							if (my $unpack = $unpack{$fh}) {
								($from, $to, $msg, $args) = @{$unpack->($packet)};
							} else {
								($from, $to, $msg, $args) = @{thaw $packet};
							}

							if ($node{$fh}) {
								$from = $node{$fh};
								$to   = $self_vpid;
							}

							$DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
							if ($to == $self_vpid) {
								$DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
								if (defined $waited_vpid and defined $waited_msg) {
									push @rcv, [$from, $msg, $args];
								} else {
									if ($msg{$msg}) {
										push @rcv, [$from, $msg, $args];
									} else {
										$DEBUG and print "Unknown message '$msg'\n";
									}
								}
							} elsif ($vpid2fh{$to}) {
								$DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
								push @{$snd{$to}}, [$from, $to, $msg, $args];
								w_event_cb_reg();
							} else {
								carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
							}

							redo NEXT_MSG if $r_bufs{$fh};
						}
					}
				}
 			} elsif (defined $len) {
				if (exists $fh2ww{$fh}) {
					$fh2ww{$fh}->cancel;
					delete $fh2ww{$fh};
				}
				$event->w->cancel;
 				delete $r_bufs{$fh};
 				delete $w_bufs{$fh};
				delete $fh2fh{$fh};
				delete $vpid2fh{$fh2vpid{$fh}};
				delete $fh2vpid{$fh};
				delete $pack{$fh};
				delete $unpack{$fh};
				if (my $vpid = $node{$fh}) {
					delete $node{$fh};
					if ($msg{NODE_CLOSED}) {
						$msg{NODE_CLOSED}->($vpid, $fh->connected ? 1 : 0);
						w_event_cb_reg();
					}
				}
 				close $fh;
				if ($self_parent_fh and $self_parent_fh eq $fh) {
					unless (defined $waited_vpid and defined $waited_msg) {
						unless (@rcv) {
							exit;
						}
					}
				}
 			} else {
 				$DEBUG and die "Can't read '$fh': $!";
 			}

		if (defined $waited_vpid and defined $waited_msg) {
			foreach my $i (0 .. $#rcv) {
				my ($from, $msg, $args)= @{$rcv[$i]};
				if ($msg eq $waited_msg and $from eq $waited_vpid) {
					splice @rcv, $i, 1;
					$DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
					@waited_rv = @$args;
					Event::unloop();
					return;
				}
			}
			unless (exists $vpid2fh{$waited_vpid}) {
				Event::unloop();
				return;
			}
		} else {
			while (my $rcv = shift @rcv) {
				my ($from, $msg, $args)= @{$rcv};
				$msg{$msg}->($from, @$args);
				w_event_cb_reg();
			}
		}
}



sub w_event_cb {
	my ($event) = @_;
	my $fh = $event->w->fd;

	$DEBUG > 1 and print "Write event from $self_vpid: \n";
	$fh2fh{$fh} or return;

			my $buf = $w_bufs{$fh};
			my $len = syswrite $fh, $buf, $blksize;
			if ($len) {
				substr $buf, 0, $len, "";
				if (length $buf) {
					$w_bufs{$fh} = $buf;
				} else {
					delete $w_bufs{$fh};
					$event->w->cancel;
					delete $fh2ww{$fh};
					w_event_cb_reg();
				}
			} else {
 				$DEBUG and die "Can't write to '$fh': $!";
			}
}



1;


__END__


=head1 NAME

IPC::MPS::Event - IPC::MPS based on L<Event>

=head1 DESCRIPTION

See description in L<IPC::MPS>.

=head1 AUTHOR

Nick Kostirya

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2009 by Nick Kostirya

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.

=cut