IPC::MPS - Message Passing Style of Inter-process communication
use IPC::MPS; my $vpid = spawn { receive { msg ping => sub { my ($from, $i) = @_; print "Ping ", $i, " from $from\n"; snd($from, "pong", $i); }; }; }; snd($vpid, "ping", 1); receive { msg pong => sub { my ($from, $i) = @_; print "Pong $i from $from\n"; if ($i < 3) { snd($from, "ping", $i + 1); } else { exit; } }; };
The messaging system between parental and child processes, and between child processes, that have the same parent.
Moto: inter-process communication without blocking.
The peculiarity of the system is that the messaging between child processes is handled by the parents. That is why we recommend using the parental processes just to coordinate the working process and to store data.
The messages are handled by the UNIX sockets.
$vpid = spawn { ... receive { msg "name" => sub { my ($from, @args) = @_; ... }; msg "name" => sub { ... }; msg "name" => sub { ... }; ... }; };
Child processes are created not when spawn is called, they are created later when receive is called, just before send-receive cycle is called. It is necessary so that all vpid are defined by fork call. vpid is an address of the link to the socket from main process to the child one.
Other spawn may be created inside spawn. If spawn is created inside receive, receive also must be called to start child processes. New receive will add its information to the old one and pass the control to the old receive messaging cycle.
The message sending.
snd($vpid, "msg name", @args);
if vpid = 0 , this is a message to the parental process.
If the parental process is over, the child process ends too.
To detect spawn closing message SPAWN_CLOSED handler should be defined:
msg SPAWN_CLOSED => sub { my ($vpid) = @_; ... };
Note, for your convenience "close" and "exit" messages are special. If a process is sent a "close" or "exit" message, then any messages from it are ignored, SPAWN_CLOSED and NODE_CLOSED too.
To break receive use "quit" subroutine.
The vpid2pid function accepts the vpid argument and returns OS PID. Importnat! PID will be available only after the receive subroutine is called.
Sometimes you may need to get additional information from other processes and only then continue the message processing. For this you may send a message with information request and then wait information getting in a proper place by subprogram wt (abbreviated "wait"), without current message processing break.
snd("vpid_1", "msg_1", @args_1); snd("vpid_2", "msg_2", @args_2); my $r = wt("vpid_1", "msg_1"); ... my @r = wt("vpid_2", "msg_2");
Subprogram wt starts new waiting cycle, sending of old messages continues and receiving of new messages starts, but new messages are not processed, they are accumulated in a buffer. When the response to a needed message is received, this waiting cycle ends and wt returns the response --- the processing of the initial message continues.
my $r = snd_wt($vpid, $msg, @args);
is a shortening for:
snd($vpid, $msg, @args); my $r = wt($vpid, $msg);
Attention, this is not Erlang, this is Perl IPC::MPS. The main differences, subsequent upon one another:
Full operating system processes.
Subprogram 'spawn' doesn't create processes directly, it just performs the preparative operations. The processes are created when 'receive' is called.
'Receive' is repeated, not a one-time as in Erlang.
'Receive' inside 'receive' doesn't supersede the previous one, but adds a new message handlers and starts new processes.
To wait the response to a specific message inside handler, subprogram 'wt' should be used. In Erlang it is done with the same 'receive'.
To transform the current process to a node you need to call 'listener' subprogram:
listener($host, $port);
Connecting to the remote node is done with 'open_node' subprogram:
my $vpid = open_node($host, $port);
You may set youself pack and unpack functions, instead of freeze and thaw functions of Storable module:
listener($host, $port, pack => sub { ... }, unpack => sub { ... }); my $vpid = open_node($host, $port, pack => sub { ... }, unpack => sub { ... });
To detect connection closing message NODE_CLOSED handler should be defined:
msg NODE_CLOSED => sub { my ($vpid, $connected) = @_; if ($connected) { print "Node closed.\n"; } else { print "Cannot connect to node: $!.\n"; } ... };
This statement is true for both the client and the server.
use IPC::MPS; my $ping_pong = 3; my ($vpid1, $vpid2); $vpid1 = spawn { snd($vpid2, "ping", 1); receive { msg pong => sub { my ($from, $i) = @_; print "Pong $i from $from\n"; if ($i < $ping_pong) { snd($from, "ping", $i + 1); } else { snd(0, "exit"); } }; }; }; $vpid2 = spawn { receive { msg ping => sub { my ($from, $i) = @_; print "Ping ", $i, " from $from\n"; snd($from, "pong", $i); }; }; }; receive { msg exit => sub { print "EXIT\n"; exit; }; };
use IPC::MPS; my $ping_pong = 5; sub ping_pong($) { my $vpid = shift; sub { msg ping => sub { my ($from, @args) = @_; print "Ping ", $args[0], " from $from\n"; snd($from, "pong", $args[0]); if ($args[0] < $ping_pong) { snd($vpid, "ping", $args[0] + 1, $$); } }; msg pong => sub { my ($from, @args) = @_; print "Pong ", $args[0], " from $from\n"; unless ($args[0] < $ping_pong) { snd(0, "exit"); } }; }; } my ($vpid1, $vpid2, $vpid3); $vpid1 = spawn { snd($vpid2, "ping", 1, $$); receive { ping_pong($vpid2)->() }; }; $vpid2 = spawn { receive { ping_pong($vpid3)->() }; }; $vpid3 = spawn { receive { ping_pong($vpid1)->() }; }; receive { msg exit => sub { print "EXIT\n"; exit; }; };
use IPC::MPS; my $vpid1 = spawn { my $vpid2 = spawn { receive { msg hello2 => sub { print "Hello 2\n"; }; }; }; receive { msg hello1 => sub { print "Hello 1\n"; snd($vpid2, "hello2"); my $vpid3 = spawn { receive { msg hello3 => sub { print "Hello 3\n"; }; }; }; snd($vpid3, "hello3"); receive {}; }; }; }; spawn { sleep 1; print "SLEEP\n"; snd(0, "exit"); receive {}; }; snd($vpid1, "hello1"); receive { msg exit => sub { print "EXIT\n"; exit; }; };
Waiting for a response to a specific message.
use IPC::MPS; my $vpid = spawn { receive { msg foo => sub { my ($from, $text) = @_; print "foo: $text\n"; snd(0, "too", 1); print "too -> baz\n"; my $rv = wt(0, "baz"); print "baz: $rv\n"; my @rv = snd_wt(0, "sugar", $rv); print "sugar: $rv[0]\n"; exit; }; }; }; snd($vpid, "foo", "Hello, wait"); receive { msg too => sub { my ($from, $i) = @_; print "too: $i\n"; snd($from, "baz", ++$i); }; msg sugar => sub { my ($from, $i) = @_; snd($from, "sugar", ++$i); }; };
See directory demo.
IPC::MPS based on IO::Select.
IPC::MPS::Event based on Event.
IPC::MPS::EV based on EV.
IPC::MPS::Event and IPC::MPS::EV allows usage of side modules based on Event, EV modules accordingly (directly or thru AnyEvent).
use IPC::MPS::Event; use Event; my $vpid = spawn { receive { msg ping => sub { my ($from, $hello) = @_; print "$hello; $$\n"; Event->timer(after => 1, cb => sub { snd($from, "pong", "hy"); }); }; }; }; snd($vpid, "ping", "hello"); receive { msg pong => sub { my ($from, $hello) = @_; print "$hello; $$\n"; print "EXIT\n"; exit; }; };
use IPC::MPS::Event; use AnyEvent::HTTP; my $vpid = spawn { receive { msg req => sub { my ($from, $url) = @_; http_get $url, sub { print ${$_[1]}{URL}, "\t", ${$_[1]}{Status}, "; $$\n"; snd($from, "res", $url, ${$_[1]}{Status}); }; }; }; }; snd($vpid, "req", "http://localhost/"); receive { msg res => sub { my ($from, $url, $status) = @_; print "$url\t$status; $$\n"; print "EXIT\n"; exit; }; };
Sometimes it is easier to use a module BGS - Background execution of subroutines in child processes.
Nick Kostyria
Copyright (C) 2009 by Nick Kostyria
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.8 or, at your option, any later version of Perl 5 you may have available.
To install IPC::MPS, copy and paste the appropriate command in to your terminal.
cpanm
cpanm IPC::MPS
CPAN shell
perl -MCPAN -e shell install IPC::MPS
For more information on module installation, please visit the detailed CPAN module installation guide.