package ZMQx::Class::Socket;
use strict;
use warnings;
use 5.010;
# ABSTRACT: A ZMQ Socket
use Moose;
use Carp qw(croak carp);
use namespace::autoclean;
use Package::Stash;
use ZMQ::LibZMQ3;
use ZMQ::Constants ':all';
# TODO
# has 'bind_or_connect',
# has 'address',
has '_init_opts_for_cloning' => ( is => 'ro', isa => "ArrayRef", default => sub {[]});
has '_socket' => (
is=>'rw',
isa=>'ZMQ::LibZMQ3::Socket',
required=>1,
);
has 'type' => (
is=>'ro',
required=>1,
);
has '_connected' => (
is=>'rw',
isa=>'Bool',
default=>0,
);
has '_pid' => ( is => 'rw', isa => 'Int', required => 1);
sub socket {
my ( $self ) = @_;
if ($$ != $self->_pid ) {
# TODO instead of init_opts_for_cloning get stuff required to re-initate via getsockopt etc
my ($class, @call) = @{$self->_init_opts_for_cloning};
my $socket = $class->socket(@call);
$self->_socket($socket->socket);
$self->_pid($socket->_pid);
}
return $self->_socket;
}
sub bind {
my ($self, $address) = @_;
my $rv = zmq_bind($self->socket,$address);
if ($rv == -1) {
croak "Cannot bind: $!";
}
$self->_connected(1);
}
sub connect {
my ($self, $address) = @_;
my $rv = zmq_connect($self->socket,$address);
if ($rv == -1) {
croak "Cannot connect: $!";
}
$self->_connected(1);
}
sub setsockopt {
my $self = shift;
zmq_setsockopt($self->socket, @_);
}
sub getsockopt {
my $self = shift;
zmq_getsockopt($self->socket, @_);
}
sub send {
my ($self, $parts, $flags) = @_;
$flags //= 0;
if (!ref($parts)) {
$parts = [$parts];
}
my $max_idx = $#{$parts};
my $socket = $self->socket;
if ($max_idx == 0) { # single part message
return zmq_msg_send($parts->[0], $socket, $flags);
}
# multipart
my $mflags = $flags ? $flags | ZMQ_SNDMORE : ZMQ_SNDMORE;
foreach (0 .. $max_idx - 1) {
my $rv = zmq_msg_send( $parts->[$_], $socket, $mflags);
return $rv if $rv == -1;
}
my $rv = zmq_msg_send( $parts->[$max_idx], $socket, $flags);
return $rv;
}
sub receive_multipart {
my $rv = receive(@_);
carp 'DEPRECATED! Use $socket->receive() instead';
*{receive_multipart} = *{receive} unless $ENV{HARNESS_ACTIVE};
return $rv;
}
sub receive {
my ($self, $blocking) = @_;
my $socket = $self->socket;
my @parts;
while (1) {
my $msg = zmq_msg_init();
my $rv = zmq_msg_recv($msg, $socket, $blocking ? 0 : ZMQ_DONTWAIT);
return if $rv == -1;
push (@parts,zmq_msg_data( $msg ));
if (!zmq_getsockopt($socket, ZMQ_RCVMORE)) {
last;
}
}
if (@parts) {
return \@parts;
}
return;
}
sub subscribe {
my ($self, $subscribe) = @_;
croak('$socket->subscribe only works on SUB sockets') unless $self->type =~/^X?SUB$/;
croak('required parameter $subscription missing') unless defined $subscribe;
zmq_setsockopt($self->socket,ZMQ_SUBSCRIBE,$subscribe);
}
sub get_fh {
carp 'DEPRECATED! Use $socket->get_fd() instead';
my $rv = get_fd(@_);
*{get_fh} = *{get_fd};
return $rv;
}
sub get_fd {
my $self = shift;
return zmq_getsockopt($self->socket, ZMQ_FD);
}
{
no strict 'refs';
my @sockopts_before_connect = qw(ZMQ_SNDHWM ZMQ_RCVHWM ZMQ_AFFINITY ZMQ_IDENTITY ZMQ_RATE ZMQ_RECOVERY_IVL ZMQ_SNDBUF ZMQ_RCVBUF ZMQ_RECONNECT_IVL ZMQ_RECONNECT_IVL_MAX ZMQ_BACKLOG ZMQ_MAXMSGSIZE ZMQ_MULTICAST_HOPS ZMQ_RCVTIMEO ZMQ_SNDTIMEO ZMQ_IPV4ONLY ZMQ_EVENTS);
my @sockopts_after_connect = qw(ZMQ_SUBSCRIBE ZMQ_UNSUBSCRIBE ZMQ_LINGER ZMQ_ROUTER_MANDATORY ZMQ_XPUB_VERBOSE);
my $stash = Package::Stash->new(__PACKAGE__);
foreach my $const (@sockopts_before_connect) {
_setup_sockopt_helpers($const, $stash, 1);
}
foreach my $const (@sockopts_after_connect) {
_setup_sockopt_helpers($const, $stash, 0);
}
}
sub _setup_sockopt_helpers {
my ($const, $stash, $set_only_before_connect) = @_;
my $get = my $set = lc($const);
$set =~s/^zmq_/set_/;
$get =~s/^zmq_/get_/;
no strict 'refs';
if ($stash->has_symbol('&'.$const)) {
my $constval = &$const;
if ($set_only_before_connect) {
$stash->add_symbol('&'.$set => sub {
my $self = shift;
if ($self->_connected) {
carp "Setting '$const' only works before connect/bind. Value not stored!";
}
else {
zmq_setsockopt($self->socket,$constval,@_);
}
return $self;
});
}
else {
$stash->add_symbol('&'.$set => sub {
my $self = shift;
zmq_setsockopt($self->socket,$constval,@_);
return $self;
});
}
$stash->add_symbol('&'.$get => sub {
my $self = shift;
return zmq_getsockopt($self->socket,$constval);
});
}
}
sub anyevent_watcher {
my ($socket, $callback) = @_;
my $fd = $socket->get_fd;
my $watcher = AnyEvent->io (
fh => $fd,
poll => "r",
cb => $callback
);
return $watcher;
}
1;
__END__
=pod
=head1 NAME
ZMQx::Class::Socket - A ZMQ Socket
=head1 VERSION
version 0.005
=head1 METHODS
=head2 socket
$socket->socket;
Returns the underlying C<ZMQ::LibZMQ3::Socket> socket. You probably won't need to call this method yourself.
When a process containg a socket is forked, a new instance of the socket will be set up for the child process.
=head2
$socket->bind( $address );
Bind a socket to an address. Use this for the "server" side, which usually is the more stable part of your infrastructure.
C<bind> will C<die> if it cannot bind.
=head2 connect
$socket->connect( $address );
Connect the socket to an address. Use this for the "client" side.
C<connect> will C<die> if it cannot connect.
=head2 setsockopt
use ZMQ::Constants qw( ZMQ_LINGER );
$socket->setsockopt( ZMQ_LINGER, 100 );
Set a socket options using a constant. You will need to load the constant from C<ZMQ::Constants>.
=head2 getsockopt
use ZMQ::Constants qw( ZMQ_LINGER );
$socket->getsockopt( ZMQ_LINGER );
Get a socket option value using a constant. You will need to load the constant from C<ZMQ::Constants>.
=head2 send
my $rv = $socket->send( \@message );
my $rv = $socket->send( \@message, ZMQ_DONTWAIT );
my $rv = $socket->send( $message );
Send a message over the socket.
The message can either be a plain string or an ARRAYREF which will be
send as a multipart message (with one message per array element).
C<send> will automatically set C<ZMQ_SENDMORE> for multipart messages.
You can pass flags to C<send>. Currently the only flag is C<ZMQ_DONTWAIT>.
C<send> returns the number of bytes send in the last message (TODO this should be changes to the total number of bytes for the whole multipart message), or -1 on error.
=head2 receive
my $msg = $socket->receive;
my $msg = $socket->receive('blocking;);
C<receive> will get the next message from the socket, if there is one.
You can use the blocking mode (by passing a true value to C<receive>) to block the process until a message has been received (NOT a wise move if you are connected to a lot of clients! Use AnyEvent in this case)
The message will always be a ARRAYREF containing one element per message part.
Returns C<undef> if no message can be received.
See t/30_anyevent.t for some examples
=head2 anyevent_watcher
my $watcher = $socket->anyevent_watcher( sub {
while (my $msg = $socket->receive) {
# do something with msg
}
} );
Set up an AnyEvent watcher that will call the passed sub when a new
incoming message is received on the socket.
Note that the C<$socket> object isn't passed to the callback. You can only access the C<$socket> thanks to closures.
Please note that you will have to load C<AnyEvent> in your code!
=head1 AUTHOR
Thomas Klausner <domm@plix.at>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2013 by Validad AG.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut