BEGIN {
require Config;
if (!$Config::Config{useithreads}) {
print "1..0 # Skip: no ithreads\n";
exit 0;
}
}
use strict;
use Test::More;
use Test::Requires qw(Test::TCP Proc::Guard IO::Socket::INET);
use threads;
BEGIN {
use_ok "ZMQ::LibZMQ2";
use_ok "ZMQ::Constants", qw(:v2.1.11 ZMQ_REQ ZMQ_XREQ ZMQ_XREP ZMQ_REQ ZMQ_REP ZMQ_QUEUE);
}
my $port = Test::TCP::empty_port();
my $proc = Proc::Guard->new(code => sub {
my $ctxt = zmq_init();
my $sock = zmq_socket($ctxt, ZMQ_REQ);
zmq_connect($sock, "tcp://127.0.0.1:$port" );
for my $i (1..10) {
zmq_send($sock, "Hello $i");
my $message = zmq_recv($sock);
}
for (1..5) {
zmq_send($sock, "END");
my $message = zmq_recv($sock);
}
zmq_close($sock);
zmq_term($ctxt);
});
my $ctxt = zmq_init();
my $device = threads->create( sub {
my $ctxt = shift;
my $clients = zmq_socket($ctxt, ZMQ_XREP);
my $workers = zmq_socket($ctxt, ZMQ_XREQ);
zmq_bind($clients, "tcp://127.0.0.1:$port" );
zmq_bind($workers, "inproc://workers" );
zmq_device(ZMQ_QUEUE, $clients, $workers);
}, $ctxt );
$device->detach();
my @threads;
for (1..5) {
push @threads, threads->create( sub {
my $tid = threads->tid;
my $ctxt = shift;
my $wsock = zmq_socket($ctxt, ZMQ_REP);
zmq_connect($wsock, "inproc://workers" );
my $loop = 1;
while ($loop) {
my $message = zmq_recv($wsock);
my $string = zmq_msg_data($message);
if ($string eq 'END') {
$loop = 0;
zmq_send($wsock, "END");
zmq_close($wsock);
} else {
zmq_send($wsock, "World $tid");
}
}
}, $ctxt );
}
foreach my $thr (@threads) {
$thr->join;
}
ok(1);
undef $proc;
done_testing;