The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

use strict;
use Test::More;
use Test::mysqld;
use Queue::Q4M::Worker;

my $mysqld = Test::mysqld->new(
    my_cnf => {
        'skip-networking' => '',
    }
);

my $dbh = DBI->connect($mysqld->dsn, undef, undef, {
    RaiseError => 1,
    AutoCommit => 1,
});

my @deploy_stmts = (
    q|INSTALL PLUGIN queue SONAME 'libqueue_engine.so'|,
    q|CREATE FUNCTION queue_wait RETURNS INT SONAME 'libqueue_engine.so'|,
    q|CREATE FUNCTION queue_end RETURNS INT SONAME 'libqueue_engine.so'|,
    q|CREATE FUNCTION queue_abort RETURNS INT SONAME 'libqueue_engine.so'|,
    q|CREATE FUNCTION queue_rowid RETURNS INT SONAME 'libqueue_engine.so'|,
    q|CREATE FUNCTION queue_set_srcid RETURNS INT SONAME 'libqueue_engine.so'|,
    q|CREATE FUNCTION queue_compact RETURNS INT SONAME 'libqueue_engine.so'|,
    q|CREATE TABLE queue ( args TEXT NOT NULL) ENGINE=Queue|,
);
foreach my $stmt (@deploy_stmts) {
    $dbh->do( $stmt );
}

for( 1..10 ) {
    $dbh->do(<<EOSQL, undef, $_);
        INSERT INTO queue VALUES (?)
EOSQL
}

my $alrmed;
eval {
    $SIG{ALRM} = sub { $alrmed++ };
    alarm(10);
    my %rows;
    my $worker = Queue::Q4M::Worker->new(
        dbh => $dbh,
        sql => "SELECT args FROM queue WHERE queue_wait('queue', 1)",
        work_once => sub {
            my ($worker, $row) = @_;
    
            $rows{ $row->{args} }++;
            if ( keys %rows == 10 ) {
                note "Received all rows, stopping process";
                ok(1, "Received all rows");
                $worker->signal_received('INT'); # Dummy
            }
        }
    );
    $worker->work;
    alarm(0);
};
if ($@) {
    alarm(0);
    fail ("Received exception: $@");
}
    
done_testing;