The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package Vulcan::Multicast;

=head1 NAME

Vulcan::Multicast - data distribution via multicast

=cut
use strict;
use warnings;
use Carp;

use File::Temp;
use Digest::MD5;
use IO::Socket::Multicast;
use Time::HiRes qw( sleep time );

use constant
{
    MTU => 1500, HEAD => 50, MAXBUF => 4096, REPEAT => 2, NULL => ''
};

=head1 SYNOPSIS

 use Vulcan::Multicast;
 
 my $send = Vulcan::Multicast ## sender
    ->new( send => '255.0.0.2:8360', iface => 'eth1' );

 $send->send            ## default
 ( 
     '/file/path',
     ttl  => 1,          ## 1
     repeat => 2,        ## 2
     buffer => 4096,     ## MAXBUF
 );

 my $recv = Vulcan::Multicast ## receiver
    ->new( recv => '255.0.0.2:8360', iface => 'eth1' );

 $recv->recv( '/repo/path' );

=cut
sub new
{
    my ( $class, %param ) = splice @_;
    my %addr = ( send => 'PeerAddr', recv => 'LocalAddr' );
    my ( $mode ) = grep { $param{$_} } keys %addr;
    my $sock = IO::Socket::Multicast
        ->new( $addr{$mode} => $param{$mode}, ReuseAddr => 1 );

    $sock->mcast_loopback( 0 );
    $sock->mcast_if( $param{iface} ) if $param{iface};
    bless { sock => $sock, mode => $mode }, ref $class || $class;
}

sub send
{
    my ( $self, $file, %param ) = splice @_;
    confess 'not a sender' if $self->{mode} ne 'send';
    $file ||= confess "file not defined";

    my $sock = $self->{sock};
    my $repeat = $param{repeat} || REPEAT;
    my $bufcnt = $param{buffer} || MAXBUF;
    my $buflen = MTU - HEAD;

    $sock->mcast_ttl( $param{ttl} ) if $param{ttl};
    $file = readlink $file if -l $file;
    $bufcnt = MAXBUF if $bufcnt > MAXBUF;

    confess "$file: not a file" unless -f $file;
    confess "$file: open: $!\n" unless open my $fh => $file;

    my $md5 = Digest::MD5->new()->addfile( $fh )->hexdigest();
    seek $fh, 0, 0; binmode $fh;

    for ( my ( $index, $cont ) = ( 0, 1 ); $cont; )
    {
        my ( $time, @buffer ) = time;

        for ( 1 .. $bufcnt )
        {
            last unless $cont = read $fh, my ( $data ), $buflen;
            push @buffer, \$data;
        }

        map { $self->buff( $md5, $index, $_, $repeat, shift @buffer ) }
            0 .. $#buffer;

        sleep( time - $time );
        $self->buff( $md5, $index ++, ( $cont ? MAXBUF : MAXBUF + 1 ), $repeat )
    }

    close $fh;
    return $self;
}

sub buff
{
    my $self = shift;
    my $sock = $self->{sock};
    my $data = sprintf "%s%014x%04x", splice @_, 0, 3;
    my ( $repeat, $buffer ) = splice @_;

    $data .= $$buffer if $buffer;
    map { $sock->send( $data ) } 0 .. $repeat;
}

sub recv
{
    local $| = 1;

    my $self = shift;
    confess 'not a receiver' if $self->{mode} ne 'recv';

    my $sock = $self->{sock};
    my $repo = shift || confess "repo not defined";

    $repo = readlink $repo if -l $repo;
    confess "$repo: not a directory" unless -d $repo;

    for ( my %buffer; 1; )
    {
        my $data;
        next unless $sock->recv( $data, MTU );
        next unless my ( $md5, $index, $i ) = substr( $data, 0, HEAD, NULL )
            =~ /^({[0-9a-f]}32)({[0-9a-f]}14)({[0-9a-f]}4)$/;

        $index = hex $index; $i = hex $i;

        my $file = "$repo/$md5"; next if -f $file;
        my $buffer = $buffer{$md5} ||= { $index => [] };

        if ( $i < MAXBUF ) { $buffer->{$index}[$i] = \$data; next }

        my $error = "$md5: missing data!\n";
        next unless my $temp = $buffer->{temp}
            || File::Temp->new( DIR => $repo, SUFFIX => ".$md5", UNLINK => 0 );

        for my $data ( @{ $buffer->{$index} } )
        {
            unless ( $data ) { $data = \NULL; warn $error }
            print $temp $$data;
        }

        delete $buffer->{$index};
        next if $i == MAXBUF;
        seek $temp, 0, 0;

        if ( $md5 eq Digest::MD5->new()->addfile( $temp )->hexdigest() )
        { system "mv $temp $file" } else { unlink $temp }
       
        close $temp;
        delete $buffer{$md5};
    }
}

1;