The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package MIO::UDP;

=head1 NAME

MIO::UDP - Send multiple UPD datagrams in parallel.

=head1 SYNOPSIS
 
 use MIO::UDP;

 my $udp = MIO::UDP->new( qw( host1:port1 host1:port2 ... ) );
 my $result = $udp->run( max => 128, log => \*STDERR, timeout => 300 );

 my $mesg = $result->{mesg};
 my $error = $result->{error};

=cut
use strict;
use warnings;

use Carp;
use IO::Socket;
use Time::HiRes qw( time );
use IO::Poll qw( POLLIN );

use base qw( MIO );

our %RUN = %MIO::RUN;
our %MAX = %MIO::MAX;

sub new
{
    my $self = shift;
    $self->net( @_ );
}

=head1 METHODS

=head3 run( %param )

Send UDP datagrams in parallel.
The following parameters may be defined in I<%param>:

 max : ( default 128 ) number of datagrams sent in parallel.
 log : ( default STDERR ) a handle to report progress.
 timeout : ( default 300 ) number of seconds allotted for each response.
 input : ( default from STDIN ) input buffer.
 resp : ( default 1 ) want response.

Returns HASH of HASH of nodes. First level is indexed by type
( I<mesg> or I<error> ). Second level is indexed by message.

=cut
sub run
{
    confess "poll: $!" unless my $poll = IO::Poll->new();

    local $| = 1;
    local $/ = undef;

    my $self = shift;
    my @node = keys %$self;
    my ( %run, %result, %busy ) = ( resp => 1, %RUN, @_ );
    my ( $log, $max, $timeout, $input, $resp ) =
        @run{ qw( log max timeout input resp ) };

    $input ||= -t STDIN ? '' : <STDIN>;

    for ( my $time = time; @node || $poll->handles; )
    {
        if ( time - $time > $timeout ) ## timeout
        {
            for my $sock ( keys %busy )
            {
                $poll->remove( $sock );
                eval { $sock->shutdown( 2 ) };
                push @{ $result{error}{timeout} }, delete $busy{$sock};
            }

            print $log "timeout!\n";
            last;
        }

        while ( @node && keys %busy < $max )
        {
            my $node = shift @node;
            my %inet =
            (
                PeerAddr => $node, Blocking => 0, Timeout => $timeout,
                Proto => 'udp', Type => SOCK_DGRAM,
            );

            my $sock = $self->{$node}
                ? IO::Socket::INET->new( %inet )
                : IO::Socket::UNIX->new( %inet );

            unless ( $sock )
            {
                push @{ $result{error}{ "socket: $!" } }, $node;
                next;
            }

            $sock->send( $input ) if defined $input;
            print $log "$node started.\n" if $run{verbose};

            if ( $resp )
            {
                $poll->mask( $sock => POLLIN );
                $busy{$sock} = $node;
            }
            else
            {
                eval { $sock->shutdown( 2 ) };
                print $log "$node done.\n" if $run{verbose};
            }
        }

        next unless $resp;
        $poll->poll( $MAX{period} );

        for my $sock ( $poll->handles( POLLIN ) ) ## read
        {
            my $buffer; $sock->recv( $buffer, $MAX{buffer} );
            my $node = delete $busy{$sock};

            push @{ $result{mesg}{$buffer} }, $node;
            $poll->remove( $sock );
            eval { $sock->shutdown( 2 ) };
            print $log "$node done.\n" if $run{verbose};
        }
    }

    push @{ $result{error}{'not run'} }, @node if @node;
    return wantarray ? %result : \%result;
}

1;