The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package Spread::Client;

use 5.006;
use strict;
use warnings;

use bytes;
use Spread::Client::Constant ':all';
use Spread::Client::Frame;
use List::Util qw(reduce);
use Socket;

our $VERSION = '0.03_02';

sub connect {
    my %args = @_;

    my ($port, $host) = split /@/, $args{spread_name};
    
    die "Bad/missing SPREAD NAME.\n"
        unless defined $port;

    my $connect_class = $args{connect_class} ? "Spread::Client::Connection::$args{connect_class}"
                                             : "Spread::Client::Connection::Sync";

    eval "require $connect_class";

    die $@
        if $@;

    my $conn = $connect_class->new( @_ )
                or die "Could not connect to spread daemon: $!\n";

    my $sock = $conn->sock;

    # In case connection class sets socket to non-blocking
    # we will set this to non-blocking again below
    unless( defined $sock->blocking( 1 ) ) {
         die "could not set blocking on handle: $!\n";
    }

    # CONNECT ROUTINE
    unless( $conn->is_unix ) {

        my $addr = sockaddr_in($port, inet_aton( $host ));
        connect( $sock, $addr)
            or die "Could not connect to host: $host\n";
    }

    # TODO revisit this at a later date
    my $connect_message = build_connect_message( private_name => $args{private_name} || 0);
    
    # Send connect message
    $conn->write( \$connect_message );
    
    # read buffer
    my $buffer;

    # Get auth methods length, to find what auth methods we have available
    $buffer = $conn->read( 1 );
    my $authlen = ord( $$buffer );
    
    die "Bad authentication length: $authlen\n"
        if $authlen == -1 or $authlen >= 128;
    
    # Get auth methods
    my $auth_method = $conn->read( $authlen );
    
    # Send auth message
    my $auth_method_message = build_auth_message( auth_method => $$auth_method );
    $conn->write( \$auth_method_message );
    
    # Pull down accept and versions and grouplen
    $buffer = $conn->read( 5 );
    
    my (@versions, $accept, $full_version);

    # Get accept
    $accept = unpack('c', $$buffer);

    die "Did not get accept: $accept\n"
        unless $accept == 1;
    #

    $versions[0] = substr( $$buffer, 1, 1); # Major
    $versions[1] = substr( $$buffer, 2, 1); # Minor
    $versions[2] = substr( $$buffer, 3, 1); # Patch

    {
        no warnings 'once';
        $full_version = unpack('c', reduce { $a | $b } (@versions));
    }

    die "Full version is a problem: $full_version\n"
        if $full_version == -1;

    # Get private group info
    # Get group length
    my $grouplen = unpack('c', substr($$buffer, 4, 1));
    
    die "We had an issue with the group length\n"
        if $grouplen == -1;

    # Get actual name private group name
    $buffer = $conn->read( $grouplen );
    my $private_group = unpack("a*", $$buffer);
    $conn->private_group( $private_group );

    if( $conn->is_async ) {
        $sock->blocking( 0 );
        $conn->listen_for_messages;
        $conn->session_connected( 1 );
    }

    return $conn;
}

sub receive {
    my %args = @_;

    if( $args{conn}->is_async ) {

        $args{conn}->receive;

        unless( $args{conn}->message_callback ) {
            return $args{conn}->get_queued_messages;
        }
    }
    else {
        my $buffer;
    
        # pull down message header
        $buffer = $args{conn}->read( 48 );
        
        # define variables we need
        my ($mess_type, $endian);
    
        # unpack header data
        my ($service_type, $sender, $num_members, $hint, $data_len) = parse_message_header( $buffer );
    
        # Get group data
        my $pull_bytes = $num_members * 32;
        $buffer = $args{conn}->read( $pull_bytes );

        my $groups = parse_message_groups( $buffer, $num_members); 
    
        # pull down message body
        $buffer = $args{conn}->read( $data_len );
    
        my $message = parse_message_body( $buffer, $service_type);
    
        return ($service_type, $sender, $groups, $mess_type, $endian, $message);
    }
}

sub multicast {
    my %args = @_;

    my $private_group = $args{conn}->private_group;

    my $multi_message = build_standard_message( type       => $args{type} || DEFAULT_SEND_MESS,
                                                from_group => $private_group,
                                                to_groups  => $args{groups},
                                                data       => $args{message},
                                             );

    my $ret = $args{conn}->write( \$multi_message );

    return $ret;
}

sub join {
    my %args = @_;

    my $private_group = $args{conn}->private_group;

    my $join_message = build_standard_message( type       => JOIN_MESS,
                                               from_group => $private_group,
                                               to_groups  => $args{groups},
                                            );

    my $ret = $args{conn}->write( \$join_message );

    return $ret;
}

sub leave {
    my %args = @_;

    my $private_group = $args{conn}->private_group;

    my $leave_message = build_standard_message( type       => LEAVE_MESS,
                                                from_group => $private_group,
                                                to_groups  => $args{groups},
                                             );

    my $ret = $args{conn}->write( \$leave_message );

    return $ret;
}

sub disconnect {
    my %args = @_;

    my $private_group = $args{conn}->private_group;

    my $disconnect_message = build_standard_message( type       => KILL_MESS,
                                                     from_group => $private_group,
                                                     to_groups  => [ $private_group ],
                                                  );
    # write close message to daemon
    $args{conn}->write( \$disconnect_message );

    # close socket
    $args{conn}->close();
}

1;
__END__

=head1 NAME

Spread::Client - Spread client that allows synchronous OR asynchronous multicast/receive/join/leave/disconnect to spread daemons

=head1 SYNOPSIS

  # ASYNCHRONOUS AnyEvent BEHAVIOR(with POE)
  use strict;
  use warnings;
  use POE;
  use Spread::Client;
  use Spread::Client::Constant ':all'; 
  use Danga::Socket;
  use Data::Dumper;
  
  my $spread_name = '4803@localhost';
  my $private_name = 'mrperla';
  
  
  # set message count before exit
  my $message_max = 10;
  my $message_count = 0;
  
  # Connect using Danga socket connection class for 
  # running with Danga::Socket Event loop
  my $conn = Spread::Client::connect(
        spread_name   => $spread_name,
        private_name  => $private_name,
        connect_class => 'Async::AnyEvent',
     );
  
  # set callback to handle message receipts
  $conn->message_callback( \&handle_message );
  
  Spread::Client::join( conn   => $conn,
                        groups => ['channel-100'],
                      );
  
  # If you decide not to give this message a 'type'
  # then it will default to sending a RELIABLE MESSAGE
  Spread::Client::multicast( conn    => $conn,
                             groups  => ['channel-100'],
                             message => 'this is a message',
                           );

  sub handle_message {
      my ($conn, @message) = @_;
  
      print Dumper( \@message );
  
      $message_count++;
  
      if( $message_count >= $message_max ) {
  
          Spread::Client::leave( conn   => $conn,
                                 groups => ['channel-100'],
                              );
  
          Spread::Client::disconnect( conn => $conn );
          POE::Kernel->stop();
      }
  }

  # Don't forget to start your event loop  
  POE::Kernel->run();

  # ASYNCHRONOUS DANGA BEHAVIOR
  use strict;
  use warnings;
  use Spread::Client;
  use Spread::Client::Constant ':all';
  use Danga::Socket;
  use Data::Dumper;
  
  my $spread_name = '4803@localhost';
  my $private_name = 'mrperla';
  
  
  # set message count before exit
  my $message_max = 10;
  my $message_count = 0;
  
  # Connect using Danga socket connection class for 
  # running with Danga::Socket Event loop
  my $conn = Spread::Client::connect(
        spread_name   => $spread_name,
        private_name  => $private_name,
        connect_class => 'Async::Danga',
     );
  
  # set callback to handle message receipts
  $conn->message_callback( \&handle_message );
  
  Spread::Client::join( conn   => $conn,
                        groups => ['channel-100'],
                      );
  
  # If you decide not to give this message a 'type'
  # then it will default to sending a RELIABLE MESSAGE
  Spread::Client::multicast( conn    => $conn,
                             groups  => ['channel-100'],
                             message => 'this is a message',
                           );

  sub handle_message {
      my ($conn, @message) = @_;
  
      print Dumper( \@message );
  
      $message_count++;
  
      if( $message_count >= $message_max ) {
  
          Spread::Client::leave( conn   => $conn,
                                 groups => ['channel-100'],
                              );
  
          Spread::Client::disconnect( conn => $conn );
          Danga::Socket->SetPostLoopCallback( sub { 0 } );
      }
  }
  # Don't forget to start your event loop  
  Danga::Socket->EventLoop();

  # SYNCHRONOUS BEHAVIOR
  use strict;
  use warnings;
  use Spread::Client;
  use Spread::Client::Constant ':all';
  use Data::Dumper;
  
  my $spread_name = '4803@localhost';
  my $private_name = 'mrperls';
  
  # Connect using Sync class
  # if connect_class is left out, defaults to 'Sync'
  my $conn = Spread::Client::connect(
        spread_name   => $spread_name,
        private_name  => $private_name,
        connect_class => 'Sync',
     );
  
  # If we need the private group
  my $private_group = $conn->private_group;
  
  Spread::Client::join( conn   => $conn,
                        groups => ['channel-100'],
                      );
  
  Spread::Client::multicast( conn    => $conn,
                             type => SAFE_MESS,
                             groups  => ['channel-100'],
                             message => 'this is a message',
                           );
  
  for (1..2) {
  
      my ($service_type, $sender, $groups, $mess_type, $endian, $message) =
      Spread::Client::receive( conn  => $conn );

      # Should Dump a join message, and our multi-cast since SELF_DISCARD isn't on
      warn Dumper( $service_type, $sender, $groups, $mess_type, $endian, $message);
  }
  
  # leave group
  Spread::Client::leave( conn   => $conn,
                         groups => ['channel-100'],
                       );

  # disconnect
  Spread::Client::disconnect( conn => $conn );

=head1 DESCRIPTION

A Spread Toolkit client implemented in perl, that allows both synchronous and asynchronous functionality.
Reading the Spread User's Guide is strongly recommended before using the Spread Toolkit in general.

=head1 CAVEATS

Right now connect is always synchronous, this may change later but unless someone really wants it
i don't see it occuring anytime soon.  Asynchronous is supported for Danga::Socket and AnyEvent which should
cover a large number of the event loops out there, include POE.  If you would like to use AnyEvent or Danga, please have them installed, it's not a prerequisite of the module as connection classes are loaded at runtime and the synchronous connection client does not depend on either Danga::Socket or AnyEvent.

=head1 EXPORT

None by default.


=head1 SEE ALSO

Constants module L<Spread::Client::Constant>.

Without these helpful sources, this module would not have been possible:

Rough spread API, documented by a nice developer. L<http://www.roughtrade.net/spread/spread-client-proto.txt>

Pure Python Spread Client L<http://code.google.com/p/py-spread/>

Spread source including their C and JAVA client L<http://www.spread.org/>

=head1 AUTHOR

Marlon Bailey, E<lt>mbailey@span.org<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2008 by Marlon Bailey

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.


=cut