The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
BEGIN {
     require Config;
     if (!$Config::Config{useithreads}) {
        print "1..0 # Skip: no ithreads\n";
        exit 0;
     }
}

use strict;
use Test::More;
use Test::Requires qw(Test::TCP Proc::Guard IO::Socket::INET);
use threads;

BEGIN {
    use_ok "ZMQ::LibZMQ2";
    use_ok "ZMQ::Constants", qw(:v2.1.11 ZMQ_REQ ZMQ_XREQ ZMQ_XREP ZMQ_REQ ZMQ_REP ZMQ_QUEUE);
}

my $port = Test::TCP::empty_port();
my $proc = Proc::Guard->new(code => sub {
    my $ctxt = zmq_init();
    my $sock = zmq_socket($ctxt, ZMQ_REQ);
    zmq_connect($sock,  "tcp://127.0.0.1:$port" );
    for my $i (1..10) {
        zmq_send($sock, "Hello $i");
        my $message = zmq_recv($sock);
    }
    for (1..5) {
        zmq_send($sock, "END");
        my $message = zmq_recv($sock);
    }
    zmq_close($sock);
    zmq_term($ctxt);
});

my $ctxt = zmq_init();
my $device = threads->create( sub {
    my $ctxt = shift;
    my $clients = zmq_socket($ctxt, ZMQ_XREP);
    my $workers = zmq_socket($ctxt, ZMQ_XREQ);
    zmq_bind($clients, "tcp://127.0.0.1:$port" );
    zmq_bind($workers, "inproc://workers" );
    zmq_device(ZMQ_QUEUE, $clients, $workers);
}, $ctxt );
$device->detach();

my @threads;
for (1..5) {
    push @threads, threads->create( sub {
        my $tid = threads->tid;
        my $ctxt = shift;
        my $wsock = zmq_socket($ctxt, ZMQ_REP);

        zmq_connect($wsock, "inproc://workers" );

        my $loop = 1;
        while ($loop) {
            my $message = zmq_recv($wsock);
            my $string  = zmq_msg_data($message);
            if ($string eq 'END') {
                $loop = 0;
                zmq_send($wsock, "END");
                zmq_close($wsock);
            } else {
                zmq_send($wsock, "World $tid");
            }
        }
    }, $ctxt );
}

foreach my $thr (@threads) {
    $thr->join;
}

ok(1);
undef $proc;

done_testing;