The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
# $Id: transfer.t,v 1.5 2003/01/27 08:25:19 ilja Exp $

use warnings;
use strict;

use Test;
BEGIN 
{     
    use vars qw(@tests $testqueue @q_len @msg_len);
    @tests = ( \&test_integrity,
               \&test_nonblocking,
               \&test_blocking );
    @q_len    = (1, 10, 128);
    @msg_len  = (1, 128, 1024, 4096);
    $testqueue = '/testq_42';
    
    plan tests => scalar(@tests);
};

use Fcntl;
use POSIX::RT::MQ;

for (@tests)
{ 
    eval {$_->()}; 
    if($@) { warn("\n$@"); ok(0) } 
    else   { ok(1) } 
} 
 


sub test_integrity
{
    for my $q_len (@q_len)
    {
        for my $msg_len (@msg_len)
        {
            my $attr = { mq_maxmsg=>$q_len, mq_msgsize=>$msg_len };

            POSIX::RT::MQ->unlink($testqueue);
            my $mq = POSIX::RT::MQ->open($testqueue, O_RDWR|O_CREAT, 0600, $attr) or die "cannot open($testqueue, O_RDWR|O_CREAT, 0600, ...): $!\n";

            my @messages = ();
            for (my $m=0; $m<$q_len; $m++)
            {
                my ($msg, $prio) = construct_message($msg_len, $m);
                push @messages, [$msg, $prio, $m];
                $mq->send($msg, $prio)  or die "cannot send(...): $!\n";
            }
            @messages = sort { ($b->[1]<=>$a->[1]) || ($a->[2]<=>$b->[2]) } @messages;
            for (my $m=0; $m<$q_len; $m++)
            {
                my ($msg,  $prio)  = $mq->receive  or die "cannot receive(): $!\n";
                my $saved = shift @messages;
                $msg eq $saved->[0] && $prio == $saved->[1]  or die "unexpected message and (or) priority\n";
            }
       }
    }       
    1;
}    

sub test_nonblocking
{
    my $q_len   = $q_len[-1];
    my $msg_len = $msg_len[-1];
    my $attr = { mq_maxmsg=>$q_len, mq_msgsize=>$msg_len };
    
    POSIX::RT::MQ->unlink($testqueue);
    my $mq = POSIX::RT::MQ->open($testqueue, O_RDWR|O_CREAT|O_NONBLOCK, 0600, $attr)  or die "cannot open($testqueue, O_RDWR|O_CREAT, 0600, ...): $!\n";
    
    # receive from empty queue
    $mq->receive  and die "receive() OK from empty queue\n";
    
    # fill the queue        
    for (my $m=0; $m<$q_len; $m++)
    {
        my ($msg, undef) = construct_message($msg_len, $m);
        $mq->send($msg)  or die "cannot send(...): $!\n";
    }
    
    # send to full queue
    my ($msg, undef) = construct_message($msg_len, 0);
    $mq->send($msg)  and die "send() OK to full queue\n";
    
    1;      
}

sub test_blocking
{
    my $q_len   = $q_len[-1];
    my $msg_len = $msg_len[-1];
    my $attr    = { mq_maxmsg=>$q_len, mq_msgsize=>$msg_len };
    
    POSIX::RT::MQ->unlink($testqueue);
    my $mq = POSIX::RT::MQ->open($testqueue, O_RDWR|O_CREAT, 0600, $attr)  or die "cannot open($testqueue, O_RDWR|O_CREAT, 0600, ...): $!\n";
    
    # receive from empty queue
    {
        my $timeout = '';
        local $SIG{ALRM} = sub { $timeout = 'TIMEOUT' };
        alarm(5);
        $mq->receive;
        $timeout eq 'TIMEOUT'  or die "receive() didn't block\n";
    }

    # fill the queue    
    for (my $m=0; $m<$q_len; $m++)
    {
        my ($msg, undef) = construct_message($msg_len, $m);
        $mq->send($msg)  or die "cannot send(...): $!\n";
    }
    
    # send to full queue
    {
        my $timeout = '';
        local $SIG{ALRM} = sub { $timeout = 'TIMEOUT' };
        my ($msg, undef) = construct_message($msg_len, 0);
        alarm(5);
        $mq->send($msg);
        $timeout eq 'TIMEOUT'  or die "send() didn't block\n";
    }
    
    1;      
}

sub construct_message
{
    my $msg_len = shift;
    my $msg_num = shift;
    my $all_chars = join '' => map { chr } (0..255);

    my $msg = "$msg_num ";
    $msg .= $all_chars  while length($msg) < $msg_len;
    substr($msg, $msg_len) = '';
    ($msg, $msg_num%8); # calculate prio
}