The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Spread::Messaging::Transport;

use 5.008;
use strict;
use warnings;

use Spread;
use Spread::Messaging::Exception;

require Exporter;

our @ISA = qw(Exporter);

# Items to export into callers namespace by default. Note: do not export
# names by default without a very good reason. Use EXPORT_OK instead.
# Do not simply export all your public functions/methods/constants.

# This allows declaration	use Spread::Transport ':all';
# If you do not need this, moving things directly into @EXPORT or @EXPORT_OK
# will save memory.

our %EXPORT_TAGS = ( 'all' => [ qw(
	
) ] );

our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );

our @EXPORT = qw(
    UNRELIABLE_MESS
    RELIABLE_MESS
    FIFO_MESS
    CAUSAL_MESS
    AGREED_MESS
    SAFE_MESS
    REGULAR_MESS
    SELF_DISCARD
    DROP_RECV
    REG_MEMB_MESS
    TRANSITION_MESS
    CAUSED_BY_JOIN
    CAUSED_BY_LEAVE
    CAUSED_BY_DISCONNECT
    CAUSED_BY_NETWORK
    MEMBERSHIP_MESS
    ACCEPT_SESSION
    ILLEGAL_GROUP
    ILLEGAL_MESSAGE
    ILLEGAL_SERVICE
    ILLEGAL_SESSION
    ILLEGAL_SPREAD
    CONNECTION_CLOSED
    COULD_NOT_CONNECT
    BUFFER_TOO_SHORT
    GROUPS_TOO_SHORT
    MESSAGE_TOO_LONG
    REJECT_ILLEGAL_NAME
    REJECT_NOT_UNIQUE
    REJECT_NO_NAME
    REJECT_QUOTA
    REJECT_VERSION
);

our $VERSION = '0.03';

# -------------------------------------------------------------------------
# public methods
# -------------------------------------------------------------------------

sub new {
    my $proto = shift;
    my %params = @_;

    my $self = {};
    my @foo = split(/\//, $0);
    my $class = ref ($proto) || $proto;

    # Initialize variables - these are defaults.

    $self->{port} = '4803';
    $self->{timeout} = '5';
    $self->{host} = 'localhost';
    $self->{service_type} = SAFE_MESS;
    $self->{private_name} = sprintf("%0s%05d", 
                                    substr($ENV{USER} || pop @foo, 0, 5), $$);

    # Parse named parameters, these may overwrite, or suppliment the above.

    my ($k, $v);
    local $_;

    if (defined ($v = delete $params{'-port'})) {

        $self->{port} = $v;

    }

    if (defined ($v = delete $params{'-host'})) {

        $self->{host} = $v;

    }

    if (defined ($v = delete $params{'-private_name'})) {

        $self->{private_name} = $v;

    }

    if (defined ($v = delete $params{'-timeout'})) {

        $self->{timeout} = $v;

    }

    if (defined ($v = delete $params{'-service_type'})) {

        $self->{service_type} = $v;

    }

    bless($self, $class);

    # Open the connection

    $self->connect();

    return $self;

}

sub connect {
    my ($self) = @_;

    my $connection;

    $connection->{spread_name} = $self->{port} . '@' . $self->{host};
    $connection->{private_name} = $self->{private_name};

    undef $sperrno;
    ($self->{mailbox}, $self->{private_group}) = Spread::connect($connection);
    if ($sperrno) {
        
        Spread::Messaging::Exception->throw(
            errno => $sperrno + 0,
            errstr => $sperrno
        );

    }

}

sub join_group {
    my ($self, $groups) = @_;

    my @groupss = split(',', $groups);

    foreach my $group (@groupss) {

        undef $sperrno;
        Spread::join($self->{mailbox}, $group);
        if ($sperrno) {

            Spread::Messaging::Exception->throw(
                errno => $sperrno + 0,
                errstr => $sperrno
            );

        }

    }

}

sub leave_group {
    my ($self, $groups) = @_;

    my @groupss = split(',', $groups);
    
    foreach my $group (@groupss) {

        undef $sperrno;
        Spread::leave($self->{mailbox}, $group);
        if ($sperrno) {

            Spread::Messaging::Exception->throw(
                errno => $sperrno + 0,
                errstr => $sperrno
            );

        }

    }

}

sub poll {
    my ($self) = @_;

    undef $sperrno;
    my $size = Spread::poll($self->{mailbox});
    if ($sperrno) {

        Spread::Messaging::Exception->throw(
            errno => $sperrno + 0,
            errstr => $sperrno
        );

    }

    return $size;

}

sub send {
    my ($self, $group, $message, $type) = @_;

    undef $sperrno;
    Spread::multicast($self->{mailbox}, $self->{service_type}, $group, $type, $message);
    if ($sperrno) {

        Spread::Messaging::Exception->throw(
            errno => $sperrno + 0,
            errstr => $sperrno
        );

    }

}

sub recv {
    my ($self) = @_;

    my ($data, $content);

    undef $sperrno;
    my ($service_type, $sender, $groups, $msg_type, $endian, $message) =
        Spread::receive($self->{mailbox}, $self->{timeout});
    if ($sperrno) {

        Spread::Messaging::Exception->throw(
            errno => $sperrno + 0,
            errstr => $sperrno
        );

    }

    $data = $message;

    if ($service_type & MEMBERSHIP_MESS) {

        if ($service_type & REG_MEMB_MESS) {

            $data = _decode_reg_memb_mess($self, $service_type, $message);

        } elsif ($service_type & TRANSITION_MESS) {

            $data = _decode_transition_mess($self, $service_type, $message);

        }

    }

    return $service_type, $sender, $groups, $msg_type, $endian, $data;

}

sub disconnect {
    my ($self) = @_;

    undef $sperrno;
    Spread::disconnect($self->{mailbox});
    if ($sperrno) {

        Spread::Messaging::Exception->throw(
            errno => $sperrno + 0,
            errstr => $sperrno
        );

    }

}

# -------------------------------------------------------------------------
# public accessors
# -------------------------------------------------------------------------

sub fd {
    my ($self) = @_;

    return($self->{mailbox});

}

sub service_type {
    my ($self, $p) = @_;

    $self->{service_type} = $p if ((defined $p) && 
                                   (($p == UNRELIABLE_MESS) ||
                                    ($p == RELIABLE_MESS) ||
                                    ($p == FIFO_MESS) ||
                                    ($p == CAUSAL_MESS) ||
                                    ($p == AGREED_MESS) ||
                                    ($p == SAFE_MESS)));
    return($self->{service_type});

}

sub timeout {
    my ($self, $p) = @_;

    $self->{timeout} = $p if defined $p;
    return($self->{timeout});

}

sub host {
    my ($self, $p) = @_;

    $self->{host} = $p if defined $p;
    return($self->{host});

}

sub port {
    my ($self, $p) = @_;

    $self->{port} = $p if defined $p;
    return($self->{port});

}

sub private_group {
    my ($self, $p) = @_;

    $self->{private_group} = $p if defined $p;
    return($self->{private_group});

}

sub private_name {
    my ($self, $p) = @_;

    $self->{private_name} = $p if defined $p;
    return($self->{private_name});

}

DESTROY {
    my ($self) = @_;

    Spread::disconnect($self->{mailbox}) if defined $self->{mailbox};

}

# -------------------------------------------------------------------------
# Private methods
# -------------------------------------------------------------------------

sub _decode_reg_memb_mess {
    my ($self, $service_type, $message) = @_;

    # Try to decode the message buffer. This really should
    # be done in the xs code, not here in perl. And the returned
    # buffer doesn't match what the Spread C API documentation says 
    # should be there. Arrgh...
    #
    # The first 12 bytes is the group ID, the remainer of the
    # buffer is the current private group of this session. 
    #
    # It would have been really nice if the buffer had been broken up into
    # delimited fields, of which the first three fields would be the group ID, 
    # the next field the private group, the next field a count 
    # and the rest the fields were the actual names of the group memebers,
    # the number of whom would have matched the count. This would have
    # been a nice approximation of the actual C data structure. 
    #
    # Oh well, such is life.

    my @data;

    if (($service_type & CAUSED_BY_JOIN) || 
        ($service_type & CAUSED_BY_LEAVE) ||
        ($service_type & CAUSED_BY_DISCONNECT)) {

        @data = unpack("I[3]A*", $message);

    } elsif ($service_type & CAUSED_BY_NETWORK) {

        @data = unpack("I[3]A*", $message);

    }

    return \@data;

}

sub _decode_transition_mess {
    my ($self, $service_type, $message) = @_;

    # Try to decode the message buffer. This really should
    # be done in the xs code, not here in perl. 

    my @data = unpack("I3", $message);

    return \@data;

}

1;

__END__

=head1 NAME

Spread::Messaging::Transport - A Perl extension to the Spread Group Communications toolkit

=head1 SYNOPSIS

This module attempts to provide a simple and easy to use interface to the 
Spread Group Communications toolkit. It is a thin, object oriented layer 
over the toolkits Spread.pm.

=head1 DESCRIPTION

Your application could be as simple as this, to receive messages from a 
Spread network.

  use Spread::Messaging::Transport;

  $spread = Spread::Messaging::Transport->new();
  $spread->join_group("test");

  for (;;) {

      if ($spread->poll()) {

         my ($service_type, $sender, $groups, 
             $msg_type, $endian, $message) = $spread->recv();

         do_something($service_type, $sender, $groups, 
                      $msg_type, $endian, $message);

      }

      sleep(1);

  }

Or, your application could be as simple as this, to send messages over a 
Spread network.

  use Spread::Messaging::Transport;

  $spread = Spread::Messaging::Transport->new();
  $spread->join_group("test");

  for (;;) {

      $buffer = readline();
      $spread->send("test", $buffer, 0);

  }

But of course, this is never the case. This module will allow you to build
your application as you see fit. All errors are thrown as a 
Spread::Messaging::Exception object using Exception::Class as the base class.
No structure is enforced upon the messages being sent. This module is 
designed for maximum flexibility.

=head1 METHODS

=over 4

=item  new

This method inializes the object. Reasonable defaults are provided during
this initializaion. By default, your program will connect to a Spread 
server on port 4803, on host "localhost", with a timeout of 5 seconds, 
along with a self generated private name, using a message type of 
"SAFE_MESS". To override these defaults you can use the following 
named parameters.

=over 4

=item -port

This allows you to indicate which port number to use.

=item -host

This allows you to choose which host the Spread server is located on.

=item -timeout

This allows you to select a timeout.

=item -service_type

This allows you to choose a differant service type. The following types
are valid:

=over 4

 UNRELIABLE_MESS
 RELIABLE_MESS
 FIFO_MESS
 CAUSAL_MESS
 AGREED_MESS
 SAFE_MESS

=back

Please see the Spread documentation for the meaning of these service types.

=item -private_name

This allows you to choose a specifc private name for your private mailbox. 
This name can be used for unicast messages.

=item Example

 $spread = Spread::Messaging::Transport->new(
      -port => "8000",
      -timeout => "10",
      -host => "spread.example.com",
      -private_name => "mymailbox"
 );

=back

=item connect

This method allows you to connect to a Spread server. It is useful if you
have disconnected from your current server. You may also use this method to 
reconnect to another Spread server. It does not create a new object.

=over 4

=item Examples:

 $spread->connect();

 or

 $spread->disconnect();
 $spread->host("spread.example.com");
 $spread->connect();

=back

=item disconnect

This method will disconnect from your current Spread server. It does not
destroy the spread object.

=over 4

=item Example

 $spread->disconnect();

=back

=item poll

This methed allows you to check for pending messages. The number of 
message bytes is returned, when messages are pending.

=over 4

=item Example

 $size = $spread->poll();
 do_something() if $size;

=back

=item join_group

This allows you to join one or more Spread groups. To receive any multicast 
messages you need to join a group. The group may be a comma delimted 
list of names.

=over 4

=item Example

 $spread->join_group("test1,test2");

=back

=item leave_group

This allows you to leave a Spread group. Once you leave a group, you will
stop receiving any multicast messages for that group. A comma delimted list 
may be used to leave more then one group.

=over 4

=item Example

 $spread->leave_group("test1");

=back

=item send

This allows you to send messages to a group. You may only send a message to 
one group at a time. A group may be a either a public group or a private 
mailbox. The third parameter is an application specific message type. This
allows the application to segregate message types.

=over 4

=item Examples:

 $spread->send("test1", "This is cool", 0);

 or

 $msg->{command} => "jump";
 $msg->{command}->{parameter} => "how high";
 $data = objToJson($msg);
 $spread->send("test2", $data, 1);

=back

=item recv

This method allows you to receive messages from the Spread server. The method
will wait the default timeout period if no messages are pending. It returns
five data items. Those items are as follows:

=over 4

=item $service_type 

This is the service type of the recieved message. This module tries to 
decode the message depending on the service type. This allows the application
to do whatever with the message.

=item $sender

This is the sender of the message. The sender is ususally the mailbox the
message originated from. But may contain other data depending on the 
service type.

=item $groups

This is the groups the message was sent too.

=item $message_type

This is the application specific message type. It is defined when the message
was sent.

=item $endinan

This indicates if the endianness of the sending platform is differant from
the receiving platform.

=item $message

This is the acutal data that was sent from the sender. Depending on the
service type is may also contain one the following types of data.

=over 4

If the service type was a MEMBERSHIP_MESS with a sub type of REG_MEMB_MESS 
the message will contain the following array:

=over 4

If the sub type is CAUSED_BY_LEAVE, CAUSED_BY_JOIN or CAUSED_BY_DISCONNECT
the first three array elements are the group id, the last element is the
group name.

If the sub type is CAUSED_BY_NETWORK, the first three array elements is the
group id, the next one is the number of elements for the groups effected, 
while the last element is those groups.

=back

If the service type was a MEMBERSHIP_MESS with a sub type of TRANSITIONAL_MESS
the message will contain an array with the elements containing the 
group id.

=back

=item Example

 my ($service_type, $sender, 
     $groups, $message_type, $endian, $message) = 
       $spread->recv();

 if ($service_type & REGULAR_MESS) {

     handle_regular_message($service_type, $sender, 
                            $groups, $message_type, 
                            $endian, $message);

 } else { 

     handle_membership_message($service_type, $sender, 
                               $groups, $message_type, 
                               $endian, $message);

 }

=back

See the documentation for the Spread C API to fully understand the service
types and what data can be returned for each type.

=back

=head1 ACCESSORS

=over 4

=item  fd

This returns the file descriptor for the Spread connection. This
descriptor can be used with select() or one of the event handling modules 
to wait for messages from the server.

=over 4

=item Example

 $fd = $spread->fd;

=back

=item private_group

This returns the name of your private group.

=over 4

=item Example

 $private_group = $spread->private_group;

=back

=item port

This returns or sets the port number for the Spread server.

=over 4

=item Example

 $port = $spread->port;
 $spread->port("8000");

=back

=item host

This returns or sets the host name for the Spread server.

=over 4

=item Example

 $host = $spread->host;
 $spread->host("spread.example.com");

=back

=item timeout

This returns or sets the timeout value for your Spread connection.

=over 4

=item Example

 $timeout = $spread->timeout;
 $spread->timeout("10");

=back

=item private_name

This returns or sets the private name for your Spread connection.

=over 4

=item Example

 $name = $spread->private_name;
 $spread->private_name("myname");

=back

=item service_type

This returns or sets the service type of the Spread connection. 

=over 4

=item Example

 $service_type = $spread->service_type;
 $spread->service_type("SAFE_MESS");

=back

=back

=head1 EXPORTS

This module exports the constants that Spread.pm exposes. This is to allow
your application access to those constants.

=head1 SEE ALSO

 Spread::Messaging
 Spread::Messaging::Content

There are several other modules located on CPAN that already handle the Spread
Group Communication toolkit. They are:

=over 4

=item Spread.pm

This module is provided by the Spread toolkit to enable basic 
connectivity to a Spread server. It works, the interface is smiliar to
the C API and you need to do all of the heavy hitting on your own.

=item Spread::Message

Another wrapper module for Spread.pm. Please see this modules 
documentation for usage.

=item Spread::Session

Another wrapper modules for Spread.pm. This module is the base for these 
modules:

=over 4

 Spread::Queue
 Spread::Queue::Fifo
 Spread::Queue::Sender
 Spread::Queue::Worker
 Spread::Queue::Manager
 Spread::Queue::ManagedWorker

=back

Please read the documentation for these modules to see how they interact.

=item Messaging::Courier

Another wrapper module for Spread.pm. Please see this modules  
documentation for usage.

You also need to read the Spread documentation located at www.spread.org.
This is the definative description on what the Spread system is all about.
And should be considered mandatory reading for anybody attempting to use the
toolkit.

=back

=head1 AUTHOR

Kevin L. Esteb, E<lt>kevin@kesetb.usE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2007 by Kevin L. Esteb

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.5 or,
at your option, any later version of Perl 5 you may have available.

=cut