The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Message::Passing::Output::ZeroMQ;
use Moo;
use MooX::Types::MooseLike::Base qw/ :all /;
use namespace::clean -except => 'meta';

use ZMQ::FFI::Constants qw/ :all /;
use Time::HiRes;

with qw/
    Message::Passing::ZeroMQ::Role::HasASocket
    Message::Passing::Role::Output
/;

has '+_socket' => (
    handles => {
        '_zmq_send' => 'send',
    },
);

sub _socket_type { 'PUB' }

has socket_hwm => (
    is      => 'rw',
    default => 10000,
);

has subscribe_delay => (
    is      => 'ro',
    isa     => Num,
    default => 0.2,
    );

# socket_(probably)_subscribed, but who has the bytes for that
has socket_subscribed => (
    is  => 'rw',
    isa => Bool,
    );
has socket_connect_time => (
    is  => 'rw',
    isa => Num,
    );

sub BUILD {
    my $self = shift;
    # Force a socket to be built, so that there's more chance the first message will be sent
    if ($self->_should_connect){
        my $socket = $self->_socket;
        return;
    }

    return;
}

sub consume {
    my ($self, $data) = @_;

    # See the slow joiner problem for PUB/SUB, outlined in
    # http://zguide.zeromq.org/page:all#Getting-the-Message-Out
    if (!$self->socket_subscribed && $self->socket_connect_time){
        my $time = Time::HiRes::time;
        my $alive_time = $time - $self->socket_connect_time;
        my $sleep_time = sprintf "%.4f", ($self->subscribe_delay - $alive_time);
        # warn "Alive $alive_time, so sleep time $sleep_time";
        if ($sleep_time > 0){
            Time::HiRes::sleep $sleep_time;
        }
        $self->socket_subscribed(1);
    }

    return $self->_zmq_send($data);
}

sub setsockopt {
    my ($self, $socket) = @_;

    if ($self->zmq_major_version >= 3){
        $socket->set(ZMQ_SNDHWM, 'int', $self->socket_hwm);
    }
    else {
        $socket->set(ZMQ_HWM, 'uint64_t', $self->socket_hwm);
    }

    return;
}

after _build_socket => sub {
    my $self = shift;
    $self->socket_connect_time( Time::HiRes::time );
};

1;

=head1 NAME

Message::Passing::Output::ZeroMQ - output messages to ZeroMQ.

=head1 SYNOPSIS

    use Message::Passing::Output::ZeroMQ;

    my $logger = Message::Passing::Output::ZeroMQ->new;
    $logger->consume({data => { some => 'data'}, '@metadata' => 'value' });

    # Or see Log::Dispatch::Message::Passing for a more 'normal' interface to
    # simple logging.

    # Or use directly on command line:
    message-passing --input STDIN --output ZeroMQ --output_options \
        '{"connect":"tcp://192.168.0.1:5552"}'
    {"data":{"some":"data"},"@metadata":"value"}

=head1 DESCRIPTION

A L<Message::Passing> ZeroMQ output class.

Can be used as part of a chain of classes with the L<message-passing> utility, or directly as
a logger in normal perl applications.

=head1 ATTRIBUTES


See L<Message::Passing::ZeroMQ/CONNECTION ATTRIBUTES>.

=head2 subscribe_delay

Time in floating seconds to sleep to ensure the receiving socket has subscribed.
This is the longest the sleep might take.

See the slow-joiner problem: L<http://zguide.zeromq.org/page:all#Getting-the-Message-Out>.

DEFAULT: 0.2 seconds

=head1 METHODS

=head2 consume ($msg)

Sends a message, as-is. This means that you must have encoded the message to a string before
sending it. The C<message-pass> utility will do this for you into JSON, or you can
do it manually as shown in the example in L<Message::Passing::ZeroMQ>.

=head1 SEE ALSO

=over

=item L<Message::Passing::ZeroMQ>

=item L<Message::Passing::Input::ZeroMQ>

=item L<Message::Passing>

=item L<ZeroMQ>

=item L<http://www.zeromq.org/>

=back

=head1 SPONSORSHIP

This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API - 
<http://www.surevoip.co.uk/support/wiki/api_documentation>

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing>.

=cut