The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package POEx::ZMQ3::Sockets;

use 5.10.1;
use Carp;
use Moo;
use POE;
require POSIX;

use ZMQ::LibZMQ3;
use ZMQ::Constants 
  ## Socket types.
  qw/
    ZMQ_REQ ZMQ_REP
    ZMQ_DEALER ZMQ_ROUTER

    ZMQ_PUB ZMQ_SUB
    ZMQ_XPUB ZMQ_XSUB

    ZMQ_PUSH ZMQ_PULL

    ZMQ_PAIR
 /,
 ## Socket control.
 qw/
   ZMQ_EVENTS
   ZMQ_FD 
   ZMQ_LINGER
   ZMQ_POLLIN
   ZMQ_SUBSCRIBE
 /,
 ## Send/recv.
 qw/
   ZMQ_DONTWAIT
   ZMQ_RCVMORE ZMQ_SNDMORE
 /,
 ;


with 'MooX::Role::POE::Emitter';
use MooX::Role::Pluggable::Constants;

require POEx::ZMQ3::Sockets::ZMQSocket;
sub ZMQSocket () { 'POEx::ZMQ3::Sockets::ZMQSocket' }

require POEx::ZMQ3::Context;
has context => (
  is      => 'ro',
  default => sub { POEx::ZMQ3::Context->new },
);

has _zmq_sockets => (
  is      => 'ro',
  default => sub { +{} },
);


my %stringy_types = (
  REQ     => ZMQ_REQ,
  REP     => ZMQ_REP,
  DEALER  => ZMQ_DEALER,
  ROUTER  => ZMQ_ROUTER,

  PUB     => ZMQ_PUB,
  SUB     => ZMQ_SUB,
  XPUB    => ZMQ_XPUB,
  XSUB    => ZMQ_XSUB,
  PUSH    => ZMQ_PUSH,
  PULL    => ZMQ_PULL,

  PAIR    => ZMQ_PAIR,
);


sub BUILD {
  my ($self) = @_;

  $self->set_event_prefix( 'zmqsock_' )
    unless $self->has_event_prefix;

  $self->set_register_prefix( 'ZMQSock_' )
    unless $self->has_register_prefix;

  $self->set_shutdown_signal( 'SHUTDOWN_ZMQSOCKETS' )
    unless $self->has_shutdown_signal;

  $self->set_object_states([
    $self => {
      zsock_ready   => '_zsock_ready',
      zsock_watch   => '_zsock_watch',
      zsock_unwatch => '_zsock_unwatch',
      zsock_write   => '_zsock_write',
      
      create      => '_zpub_create',
      close       => '_zpub_close',
      bind        => '_zpub_bind',
      connect     => '_zpub_connect',
      write       => '_zpub_write',
      write_multipart => '_zpub_write_multi',
    },
    $self => [
      'emitter_started',
      'emitter_stopped',
    ],
  ]);
}


sub start {
  my ($self) = @_;
  $self->_start_emitter;
}

sub stop {
  my ($self) = @_;
  $self->_zmq_clear_all;
  ## Yes, I'm serious.
  ## (Gives zmq a little extra cleanup time after zmq_close.)
  $self->yield(sub { $_[OBJECT]->yield('shutdown_emitter') });
}

sub emitter_started {
  my ($kernel, $self) = @_[KERNEL, OBJECT];

}

sub emitter_stopped {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  $self->_zmq_clear_all;
}

sub get_zmq_socket {
  my ($self, $alias) = @_;
  confess "Expected an alias" unless defined $alias;
  my $struct = $self->_zmq_sockets->{$alias} || return;
  $struct->zsock
}

## FIXME POE interfaces to sockopt setters
sub set_zmq_sockopt {
  my ($self, $alias) = splice @_, 0, 2;
  confess "Expected an alias and params to feed zmq_setsockopt"
    unless @_;

  my $zsock = $self->get_zmq_socket($alias)
    || confess "Cannot set_zmq_sockopt; no such alias $alias";
  
  if ( zmq_setsockopt($zsock, @_) == -1 ) {
    confess "zmq_setsockopt failed; $!"
  }

  $self
}

sub set_zmq_subscribe {
  ## Common sockopt.
  my ($self, $alias, $to) = @_;
  $to //= '';
  $self->set_zmq_sockopt( $alias, ZMQ_SUBSCRIBE, $to )
}

sub create {
  my $self = shift;
  $self->call( 'create', @_ )
}

sub _zpub_create {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  $self->_zmq_create_sock( @_[ARG0 .. $#_] )
}

sub bind {
  my $self = shift;
  $self->yield( 'bind', @_ )
}

sub _zpub_bind {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my ($alias, $endpt) = @_[ARG0 .. $#_];
  confess "Expected an alias and endpoint"
    unless defined $alias and defined $endpt;

  my $zsock = $self->get_zmq_socket($alias)
    or confess "Cannot bind; no such alias $alias";
  zmq_bind($zsock, $endpt) and confess "zmq_bind failed; $!";

  $self->emit( 'bind_added', $alias, $endpt )
}


sub connect {
  my $self = shift;
  $self->yield( 'connect', @_ )
}

sub _zpub_connect {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my ($alias, $endpt) = @_[ARG0 .. $#_];
  confess "Expected an alias and endpoint"
    unless defined $alias and defined $endpt;

  my $zsock = $self->get_zmq_socket($alias)
    or confess "Cannot connect; no such alias $alias";
  zmq_connect($zsock, $endpt) and confess "zmq_connect failed; $!";

  $self->emit( 'connect_added', $alias, $endpt )
}


sub write {
  my $self = shift;
  $self->yield( 'write', @_ )
}

sub write_multipart {
  my $self = shift;
  $self->yield( 'write_multipart', @_ )
}

sub _zpub_write_multi {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my ($alias, @parts) = @_[ARG0 .. $#_];

  my $ref = $self->_zmq_sockets->{$alias}
    || confess "Cannot queue write; no such alias $alias";
  while (my $data = shift @parts) {
    my $item = ZMQSocket->new_buffer_item(
      data => $data, 
      (scalar(@parts) ? (flags => ZMQ_SNDMORE) : () ),
    );
    push @{ $ref->buffer }, $item;
  }
  ## These are call()ed, as we likely were already reached via yield/post:
  $self->call( 'zsock_write', $alias )
}

sub _zpub_write {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my ($alias, $data, $flags) = @_[ARG0 .. $#_];

  my $struct = $self->_zmq_sockets->{$alias}
    || confess "Cannot queue write; no such alias $alias";
  my $item = ZMQSocket->new_buffer_item(data => $data, flags => $flags);
  push @{ $struct->buffer }, $item;

  $self->call( 'zsock_write', $alias )
}

sub _zsock_write {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my $alias = $_[ARG0];
  my $struct = $self->_zmq_sockets->{$alias}
    || confess "Cannot execute write; no such alias $alias";

  return if $struct->is_closing or not @{ $struct->buffer };

  my $next  = $struct->buffer->[0];
  my $data  = $next->data;
  my $flags = $next->flags;
  unless (ref $data) {
    $data = zmq_msg_init_data($data)
  }

  ## This socket may change state without necessarily
  ## triggering read events. See zmq_getsockopt docs.
  $self->yield( zsock_ready => undef, 0, $alias );
  $self->yield( zsock_write => $alias );

  my $rc;
  if ( $rc = zmq_msg_send( $data, $struct->zsock, ($flags ? $flags : ()) ) 
      && ($rc//0) == -1 ) {

    unless ($rc == POSIX::EAGAIN || $rc == POSIX::EINTR) {
      confess "zmq_msg_send failed; $!";
    }
  } else {
    ## Successfully queued on socket.
    shift @{ $struct->buffer }
  }
}


sub close {
  my $self = shift;
  $self->yield( 'close', @_ )
}

sub _zpub_close {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my $alias = $_[ARG0];
  $self->_zmq_clear_sock($alias);
  $self->emit( 'closing', $alias );
}

## Workers.

sub _zsock_ready {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my (undef, $mode, $alias) = @_[ARG0 .. $#_];
  my $struct = $self->_zmq_sockets->{$alias};
  unless ($struct) {
    warn "Attempted to read socket '$alias' but no such socket struct";
    return
  }

  return if $struct->is_closing;

  my $zev = zmq_getsockopt($struct->zsock, ZMQ_EVENTS);
  return unless defined $zev
    and $zev & ZMQ_POLLIN == ZMQ_POLLIN;
  ## Socket can change state after a write/read without notifying us.
  ## Check again when we're finished.
  $self->yield( zsock_ready => undef, 0, $alias );

  if ($struct->is_closing) {
    warn "Socket '$alias' ready but closing"
      if $ENV{POEX_ZMQ_DEBUG};
    return
  }

  my @parts;
  RECV: while (1) {
    my $msg = zmq_msg_init;
    if ( zmq_msg_recv($msg, $struct->zsock, ZMQ_DONTWAIT) == -1 ) {
      if ($! == POSIX::EAGAIN || $! == POSIX::EINTR) {
        $self->yield(zsock_ready => undef, 0, $alias);
        return
      }
      confess "zmq_msg_recv failed; $!"
    }

    my $data = zmq_msg_data($msg);

    unless ( zmq_getsockopt($struct->zsock, ZMQ_RCVMORE) ) {
      ## No parts remaining on socket.
      if (@parts) {
        ## Multi-part message accumulated.
        $self->emit( multipart_recv => 
          $alias, [ @parts, $data ] 
        )
      } else {
        ## Single-part message.
        $self->emit( recv =>
          $alias, $data
        )
      }
      last RECV
    }
    ## Multi-part with more parts to follow.
    push @parts, $data;
  }

  1  
}

sub _zmq_create_sock {
  my ($self, $alias, $type) = @_; 
  confess "Expected an alias and sock type"
    unless defined $alias and defined $type;

  $type = $stringy_types{$type} if exists $stringy_types{$type};

  my $zsock = zmq_socket( $self->context, $type )
    or confess "zmq_socket failed: $!";

  my $fd = zmq_getsockopt( $zsock, ZMQ_FD )
    or confess "zmq_getsockopt failed: $!";

  open(my $fh, '<&=', $fd ) or confess "failed fdopen: $!";

  $self->_zmq_sockets->{$alias} = ZMQSocket->new(
    zsock  => $zsock,
    handle => $fh,
    fd     => $fd,
  );

  ## FIXME adjust IPV4ONLY if we have ->use_ipv6 or so?
  $self->set_zmq_sockopt($alias, ZMQ_LINGER, 0);

  $self->emit( 'created', $alias, $type );

  $self->call( zsock_watch => $alias )
}

sub _zsock_watch {
  my ($kernel, $self) = @_[KERNEL, OBJECT];
  my $alias = $_[ARG0];
  my $struct = $self->_zmq_sockets->{$alias};
  unless ($struct) {
    warn "Attempted to watch $alias but no such socket?";
    return
  }

  $kernel->select( $struct->handle,
    'zsock_ready',
    undef, undef,
    $alias
  );

  1
}

sub _zsock_unwatch {
  my ($kernel, $self, $alias) = @_[KERNEL, OBJECT, ARG0];
  my $struct = delete $self->_zmq_sockets->{$alias};
  $kernel->select( $struct->handle );
  ## Keeping us alive just a hair longer helps with flaky 'bad FD' errs.
  ## This generally only shows up when running tests.
  ## (They appear to exit before ZeroMQ has finished cleanup.)
  $self->yield(sub { $struct });
}

sub _zmq_clear_sock {
  my ($self, $alias) = @_;

  my $zsock = $self->get_zmq_socket($alias)
    or confess "Cannot _zmq_clear_sock; no such alias $alias";

  zmq_close($zsock);
  $self->_zmq_sockets->{$alias}->is_closing(1);
  
  $self->yield( zsock_unwatch => $alias )
}

sub _zmq_clear_all {
  my ($self) = @_;
  $self->_zmq_clear_sock($_) for keys %{ $self->_zmq_sockets }
}

1;


=pod

=head1 NAME

POEx::ZMQ3::Sockets - POE ZeroMQ Component

=head1 SYNOPSIS

  ## A 'REQ' client that sends 'PING' to a REP on localhost:5050
  use strictures 1;
  use POE;
  use POEx::ZMQ3::Sockets;

  POE::Session->create(
    package_states => [
      main => [ qw/
        _start
        zmqsock_registered
        zmqsock_recv
      / ],
    ]
  );

  sub _start {
    my ($kern, $heap) = @_[KERNEL, HEAP];
    my $zmq = POEx::ZMQ3::Sockets->new;

    $zmq->start;

    $heap->{zmq} = $zmq;

    $kern->call( $zmq->session_id, 'subscribe', 'all' );
  }

  sub zmqsock_registered {
    my ($kern, $heap) = @_[KERNEL, HEAP];
    my $zmq = $heap->{zmq};

    $zmq->create( 'pinger', 'REQ' );

    $zmq->connect( 'pinger', 'tcp://127.0.0.1:5050' );

    $zmq->write( 'pinger', 'PING' );
  }

  sub zmqsock_recv {
    my ($kern, $heap) = @_[KERNEL, HEAP];
    my ($alias, $data) = @_[ARG0 .. $#_];

    if ($data eq 'PONG') {
      ## Got a PONG. Send another PING:
      $zmq->write( 'pinger', 'PING' );
    }
  }

  $poe_kernel->run;

=head1 DESCRIPTION

This is the backend L<MooX::Role::POE::Emitter> session behind L<POEx::ZMQ3>,
integrating ZeroMQ (L<http://www.zeromq.org>) with a L<POE> event loop.

=head2 Registering Sessions

Your L<POE::Session> should register with the component to receive events:

  ## Inside a POE::Session
  ## Get all events from component in $_[HEAP]->{zmq}:
  sub my_start {
    my $zmq = $_[HEAP]->{zmq};
    $_[KERNEL]->call( $zmq->session_id, 'subscribe', 'all' );
  }

See L</POE API> for more on events emitted and accepted by this component.

See L<MooX::Role::POE::Emitter> for more details on event emitters; the
documentation regarding event prefixes and session details lives there.

=head2 Methods

=head3 start

Takes no arguments.

Spawns the L<MooX::Role::POE::Emitter> session that controls ZMQ socket
handling. Must be called prior to operating on sockets.

=head3 stop

Takes no arguments.

Stops the component, closing out all active sockets.

=head3 create

Takes a socket alias and a socket type.

Creates a new ZeroMQ socket. The socket is not initially bound/connected to
anything; see L</bind>, L</connect>.

The socket type may be either a constant from L<ZMQ::Constants> or a
string type:

  ## Equivalent:

  $zmq->create( $alias, 'PUB' );

  use ZMQ::Constants 'ZMQ_PUB';
  $zmq->create( $alias, ZMQ_PUB );

See the B<zmq_socket> man page for details.

=head3 bind

Takes a socket alias and an endpoint to listen for connections on.

The opposite of L</bind> is L</connect>

=head3 connect

Takes a socket alias and a target endpoint to connect to.

Note that ZeroMQ manages its own connections asynchronously.
A successful L</bind> or L</connect> is not necessarily indicative of a
positively usable connection.

=head3 write

Takes a socket alias, some data (as a scalar), and optional flags to pass to
ZeroMQ's B<zmq_msg_send>:

  ## Write a simple message:
  $zmq->write( $alias, 'A message' );

  ## Write some serialized data:
  my $ref  = { things => 'some data' };
  my $data = Storable::nfreeze( $ref );
  $zmq->write( $alias, $data );

Writes data to the ZMQ socket, when possible.

Also see L</write_multipart>.

=head3 write_multipart

Takes a socket alias and a list of scalar data items to send as a multi-part
message:

  $zmq->write_multipart( $alias, $header, $content );

See the ZeroMQ documentation for details regarding multi-part messages.

Also see L</zmqsock_multipart_recv>

=head3 close

Takes a socket alias.

Closes the specified ZMQ socket.

=head3 context

Takes no arguments.

Returns the current L<POEx::ZMQ3::Context> object.

=head3 get_zmq_socket

Takes a socket alias.

Returns the actual L<ZMQ::LibZMQ3> socket object.

=head3 set_zmq_sockopt

Takes a socket alias and arbitrary flags/options to pass to
B<zmq_setsockopt>.

See the man page for B<zmq_setsockopt>.

=head3 set_zmq_subscribe

Takes a socket alias and an optional subscription prefix.

Calls L</set_zmq_sockopt> to set the C<ZMQ_SUBSCRIBE> flag for the specified
socket; this is used by SUB-type sockets to subscribe to messages.

If no subscription prefix is specified, the socket will be subscribed to all
messages.

=head2 POE API

=head3 Emitted Events

=head4 zmqsock_bind_added

Emitted when a L</bind> has been executed.

$_[ARG0] is the socket's alias.

$_[ARG1] is the endpoint string.

=head4 zmqsock_connect_added

Emitted when a L</connect> has been executed.

$_[ARG0] is the socket's alias.

$_[ARG1] is the endpoint string.

=head4 zmqsock_recv

Emitted when some data has been received on a socket.

$_[ARG0] is the socket's alias.

$_[ARG1] is the raw message data extracted via B<zmq_msg_data>.

=head4 zmqsock_multipart_recv

Emitted when multipart data has been received on a socket.

$_[ARG0] is the socket's alias.

$_[ARG1] is an ARRAY containing the raw data extracted from each message part.

=head4 zmqsock_created

Emitted when a socket has been created.

$_[ARG0] is the alias that was spawned.

$_[ARG1] is the socket's type, as a L<ZMQ::Constants> constant.

=head4 zmqsock_closing

Emitted when a socket is being shut down.

$_[ARG0] is the alias that is closing.

=head3 Accepted Events

The following events take the same parameters as their counterparts described
in L</Methods>:

=over

=item *

create

=item *

close

=item *

bind

=item *

connect

=item *

write

=item *

write_multipart

=back

=head1 SEE ALSO

L<POEx::ZMQ3>

L<ZMQ::LibZMQ3>

L<http://www.zeromq.org>

=head1 AUTHOR

Jon Portnoy <avenj@cobaltirc.org>

=cut