The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings;

use Config;

BEGIN {
    if (! $Config{'useithreads'}) {
        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
        exit(0);
    }
    if (! $Config{'d_select'}) {
        print("1..0 # SKIP 'select()' not available for testing\n");
        exit(0);
    }
}

use threads;
use Thread::Queue;

use Test::More;

my $num_threads = 3;
my $cycles = 2;
my $count = 2;
plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;

# Test for end() while threads are blocked and no more items in queue
{
    my @items = 1..($num_threads*$cycles*$count);
    my $q = Thread::Queue->new(@items);
    my $r = Thread::Queue->new();

    my @threads;
    for my $ii (1..$num_threads) {
        push @threads, threads->create( sub {
            # Thread will loop until no more work is coming
            LOOP:
            while (my @set = $q->dequeue($count)) {
                foreach my $item (@set) {
                    last LOOP if (! defined($item));
                    pass("'$item' read from queue in thread $ii");
                }
                select(undef, undef, undef, rand(1));
                $r->enqueue($ii);
            }
            pass("Thread $ii exiting");
        });
    }

    # Make sure there's nothing in the queue and threads are blocking
    for my $ii (1..($num_threads*$cycles)) {
        $r->dequeue();
    }
    sleep(1);
    threads->yield();

    is($q->pending(), 0, 'Queue is empty');

    # Signal no more work is coming
    $q->end();

    is($q->pending(), undef, 'Queue is ended');

    for my $thread (@threads) {
        $thread->join;
        pass($thread->tid." joined");
    }
}

# Test for end() while threads are blocked and items still remain in queue
{
    my @items = 1..($num_threads*$cycles*$count + 1);
    my $q = Thread::Queue->new(@items);
    my $r = Thread::Queue->new();

    my @threads;
    for my $ii (1..$num_threads) {
        push @threads, threads->create( sub {
            # Thread will loop until no more work is coming
            LOOP:
            while (my @set = $q->dequeue($count)) {
                foreach my $item (@set) {
                    last LOOP if (! defined($item));
                    pass("'$item' read from queue in thread $ii");
                }
                select(undef, undef, undef, rand(1));
                $r->enqueue($ii);
            }
            pass("Thread $ii exiting");
        });
    }

    # Make sure there's nothing in the queue and threads are blocking
    for my $ii (1..($num_threads*$cycles)) {
        $r->dequeue();
    }
    sleep(1);
    threads->yield();

    is($q->pending(), 1, 'Queue has one left');

    # Signal no more work is coming
    $q->end();

    for my $thread (@threads) {
        $thread->join;
        pass($thread->tid." joined");
    }

    is($q->pending(), undef, 'Queue is ended');
}

# Test of end() send while items in queue
{
    my @items = 1..($num_threads*$cycles*$count + 1);
    my $q = Thread::Queue->new(@items);

    my @threads;
    for my $ii (1..$num_threads) {
        push @threads, threads->create( sub {
            # Thread will loop until no more work is coming
            LOOP:
            while (my @set = $q->dequeue($count)) {
                foreach my $item (@set) {
                    last LOOP if (! defined($item));
                    pass("'$item' read from queue in thread $ii");
                }
                select(undef, undef, undef, rand(1));
            }
            pass("Thread $ii exiting");
        });
    }

    # Signal no more work is coming to the blocked threads, they
    # should unblock.
    $q->end();

    for my $thread (@threads) {
        $thread->join;
        pass($thread->tid." joined");
    }
}

exit(0);

# EOF