The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

ZMQ::FFI

Perl zeromq bindings using libffi and FFI::Raw

ZMQ::FFI exposes a high level, transparent, OO interface to zeromq independent of the underlying libzmq version. Where semantics differ, it will dispatch to the appropriate backend for you. As it uses ffi, there is no dependency on XS or compilation.

EXAMPLES

send/recv

use v5.10;
use ZMQ::FFI;
use ZMQ::FFI::Constants qw(ZMQ_REQ ZMQ_REP ZMQ_DONTWAIT);

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new( threads => 1 );

my $s1 = $ctx->socket(ZMQ_REQ);
$s1->connect($endpoint);

my $s2 = $ctx->socket(ZMQ_REP);
$s2->bind($endpoint);

$s1->send('ohhai', ZMQ_DONTWAIT);

say $s2->recv();
# ohhai

pub/sub

use v5.10;
use ZMQ::FFI;
use ZMQ::FFI::Constants qw(ZMQ_PUB ZMQ_SUB ZMQ_DONTWAIT);
use Time::HiRes q(usleep);

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();

my $s = $ctx->socket(ZMQ_SUB);
my $p = $ctx->socket(ZMQ_PUB);

$s->connect($endpoint);
$p->bind($endpoint);

# all topics
{
    $s->subscribe('');
    $p->send('ohhai', ZMQ_DONTWAIT);

    until ($s->has_pollin) {
        # compensate for slow subscriber
        usleep 100_000;
        $p->send('ohhai', ZMQ_DONTWAIT);
    }

    say $s->recv();
    # ohhai

    $s->unsubscribe('');
}

# specific topics
{
    $s->subscribe('topic1');
    $s->subscribe('topic2');

    $p->send('topic1 ohhai', ZMQ_DONTWAIT);
    $p->send('topic2 ohhai', ZMQ_DONTWAIT);

    until ($s->has_pollin) {
        usleep 100_000;
        $p->send('topic1 ohhai', ZMQ_DONTWAIT);
        $p->send('topic2 ohhai', ZMQ_DONTWAIT);
    }

    while ($s->has_pollin) {
        say join ' ', $s->recv();
        # topic1 ohhai
        # topic2 ohhai
    }
}

multipart

use v5.10;
use ZMQ::FFI;
use ZMQ::FFI::Constants qw(ZMQ_DEALER ZMQ_ROUTER ZMQ_DONTWAIT);

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();

my $d = $ctx->socket(ZMQ_DEALER);
$d->set_identity('dealer');

my $r = $ctx->socket(ZMQ_ROUTER);

$d->connect($endpoint);
$r->bind($endpoint);

$d->send_multipart([qw(ABC DEF GHI)], ZMQ_DONTWAIT);

say join ' ', $r->recv_multipart;
# dealer ABC DEF GHI

nonblocking

use v5.10;
use ZMQ::FFI;
use ZMQ::FFI::Constants qw(ZMQ_PUSH ZMQ_PULL);
use AnyEvent;

my $endpoint = "ipc://zmq-ffi-$$";
my $ctx      = ZMQ::FFI->new();
my @messages = qw(foo bar baz);


my $pull = $ctx->socket(ZMQ_PULL);
$pull->bind($endpoint);

my $fd = $pull->get_fd();

my $recv = 0;
my $w = AE::io $fd, 0, sub {
    while ( $pull->has_pollin ) {
        say $pull->recv();
        # foo, bar, baz

        $recv++;
        if ($recv == 3) {
            EV::unloop();
        }
    }
};


my $push = $ctx->socket(ZMQ_PUSH);
$push->connect($endpoint);

my $sent = 0;
my $t;
$t = AE::timer 0, .1, sub {
    $push->send($messages[$sent]);

    $sent++;
    if ($sent == 3) {
        undef $t;
    }
};

EV::run();

INSTALL

# if simply wanting to use ZMQ::FFI
cpanm ZMQ::FFI

# if wanting to hack on the source and build the dist
git clone git@github.com:calid/zmq-ffi.git
cd zmq-ffi
cpanm Dist::Zilla # if not already installed
dzil authordeps | cpanm
dzil build