The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Message::Passing::Input::ZeroMQ;
use Moo;
use ZMQ::FFI::Constants qw/ :all /;
use AnyEvent;
use Scalar::Util qw/ weaken /;
use Try::Tiny qw/ try catch /;
use namespace::clean -except => 'meta';

with qw/
    Message::Passing::ZeroMQ::Role::HasASocket
    Message::Passing::Role::Input
/;

has '+_socket' => (
    handles => {
        _zmq_recv => 'recv',
    },
);

sub _socket_type { 'SUB' }

has socket_hwm => (
    is      => 'rw',
    default => 10000,
);

has subscribe => (
    isa => sub { ref($_[0]) eq 'ARRAY' },
    is => 'ro',
    lazy => 1,
    default => sub { [ '' ] }, # Subscribe to everything!
);

sub setsockopt {
    my ($self, $socket) = @_;

    if ($self->zmq_major_version >= 3){
        $socket->set(ZMQ_RCVHWM, 'int', $self->socket_hwm);
    }
    else {
        $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
    }

    if ($self->socket_type eq 'SUB') {
        foreach my $sub (@{ $self->subscribe }) {
            $socket->set(ZMQ_SUBSCRIBE, "binary", $sub);
        }
    }

    return;
}

sub _try_rx {
    my $self = shift();
    my $msg;
    try {
        $msg = $self->_zmq_recv(ZMQ_NOBLOCK);
    };
    if ($msg) {
        $self->output_to->consume($msg);
    }
    return $msg;
}

has _io_reader => (
    is => 'ro',
    lazy => 1,
    default => sub {
        my $weak_self = shift;
        weaken($weak_self);
        AE::io $weak_self->_socket->get_fd, 0,
            sub { my $more; do { $more = $weak_self->_try_rx } while ($more) };
    },
);

# Note that we need this timer as ZMQ is magic..
# Just checking our local FD for readability will not always
# be enough, as the client end of ZQM may not start pushing messages to us,
# ergo we call ->recv explicitly on the socket to get messages
# which may be pre-buffered at a client as fast as possible (i.e. before
# the client pushes another message).
has _zmq_timer => (
    is => 'ro',
    lazy => 1,
    default => sub {
        my $weak_self = shift;
        weaken($weak_self);
        AnyEvent->timer(after => 1, interval => 1,
            cb => sub { my $more; do { $more = $weak_self->_try_rx } while ($more) });
    },
);

sub BUILD {
    my $self = shift;
    $self->_io_reader;
    $self->_zmq_timer;
}

1;

=head1 NAME

Message::Passing::Input::ZeroMQ - input messages from ZeroMQ.

=head1 SYNOPSIS

    message-passing --output STDOUT --input ZeroMQ --input_options '{"socket_bind":"tcp://*:5552"}'

=head1 DESCRIPTION

A L<Message::Passing> ZeroMQ input class.

Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
an input with L<Message::Passing::DSL>.

=head1 ATTRIBUTES

See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>

=head2 subscribe

If the input socket is a C<SUB> socket, then the C<ZMQ_SUBSCRIBE>
socket option will be set once for each value in the subscribe attribute.

Defaults to '', which means all messages are subscribed to.

=head1 SEE ALSO

=over

=item L<Message::Passing::ZeroMQ>

=item L<Message::Passing::Output::ZeroMQ>

=item L<Message::Passing>

=item L<ZeroMQ>

=item L<http://www.zeromq.org/>

=back

=head1 SPONSORSHIP

This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API - 
<http://www.surevoip.co.uk/support/wiki/api_documentation>

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing::ZeroMQ>.

=cut