package Net::MessageBus;

use 5.006;
use strict;
use warnings;

=head1 NAME

Net::MessageBus - Pure Perl simple message bus

=head1 VERSION

Version 0.08


our $VERSION = '0.08';

use base 'Net::MessageBus::Base';

use Net::MessageBus::Message;

use IO::Socket::INET;
use IO::Select;
use JSON;

$| = 1;


This module implements the client side of the Message Bus.

    use Net::MessageBus;
    my $MessageBus = Net::MessageBus->new(
                        server => '',
                        group => 'backend',
                        sender => 'machine1',
                        username => 'user',
                        password => 'password',
                        logger  => $logger_object,
                        blocking => 0,
                        timeout => 0.01

On initialization the client authenticates with the Net::MessageBus::Server
after which it can start pushing messages to the bus.

In order to receive any messages from the bus the client must subscribe to :

=over 4

=item * one or more groups

=item * one or more senders

=item * one or more message types

=item * all messages

    #each can be called multiple times
    $MessageBus->subscribe(group => 'test');
    $MessageBus->subscribe(sender => 'test_process_1');
    $MessageBus->subscribe(type => 'test_message_type');
The client can unsubscribe at any time by calling the C<unsubscribe> method

To retrive the messages received from the bus, the client can call one of this
methods :

    my @messages = $MessageBus->pending_messages();
    my $message = $MessageBus->next_message();

=head1 EXAMPLE

    use Net::MessageBus;

    my $MessageBus = Net::MessageBus->new(server => '',
                          group => 'backend',
                          sender => 'machine1');
    $MessageBus->subscribe(group => 'test');
    $MessageBus->subscribe(sender => 'test_process_1');
    my @messages = $MessageBus->pending_messages();
    while (my $message = $MessageBus->next_message()) {
        print $message->type();


=head2 new

Creates a new New::MessageBus object

=over 10

=item * server = The ip address of the server    

=item * port = The port on which the server is listening for connections

=item * group =  The group to which this client belogs to

=item * sender = A name for the current client

=item * username = User name that will be sent to the server for authentication

=item * password = The password that will be sent to the server for authentication

=item * logger = A object on which we can call the fallowing methods C<debug,info,warn,error>

=item * block = if we don't have any unread messages from the server we will
block until the server sends something. If I<block> is true I<timeout> will be ignored.

=item * timeout = the maximum ammount of time we should wait for a message from the server
before returning C<undef> for C<next_message()> or an empty list for C<pending_messages()>



    my $MessageBus = Net::MessageBus->new(
                        server => '',
                        group => 'backend',
                        sender => 'machine1',
                        username => 'user',
                        password => 'password',
                        logger  => $logger_object,

sub new {
    my $class = shift;
    my %params;
    if ((ref($_[0]) || '') eq "HASH") {
        %params = %{$_[0]};
    else {
        %params = @_;
    my $self = {
                server_address => $params{server} || '',
                server_port    => $params{port} || '4500',
                logger         => $params{logger} || Net::MessageBus::Base::create_default_logger(),
                group          => $params{group},
                sender         => $params{sender},
                username       => $params{username},
                password       => $params{password},
				timeout		   => defined($params{timeout}) ? $params{timeout} : 0.01,
				blocking	   => defined($params{blocking}) ? $params{blocking} :
                                                               defined($params{timeout}) ? 0 : 1,
                msgqueue       => [],
                buffer         => '',
    bless $self, $class;
    return $self;

=head2 subscribe

Subscribes the current Net::MessageBus client to the messages from the
specified category. It can be called multiple times

B<Example> :

    $MessageBus->subscribe(group => 'test');
    $MessageBus->subscribe(sender => 'test_process_1');
sub subscribe {
    my $self = shift;
    return $self->send_to_server('subscribe',{ @_ } );

=head2 subscribe_all

Subscribes the current Net::MessageBus client to all the messages 
the server receives

B<Example> :

sub subscribe_all {
    my $self = shift;
    return $self->send_to_server('subscribe',{ all => 1 } );

=head2 unsubscribe

Unsubscribes current Net::MessageBus client from all the messages it 
previously subscribed to

B<Example> :

sub unsubscribe {
    my $self = shift;
    return $self->send_to_server('subscribe',{ unsubscribe => 1 } );

=head2 send

Send a new messge to the message queue.
It has two forms in which it can be called :

=over 4

=item 1. With a Net::MessageBus::Message object as argument

=item 2. With a hash ref containing the fallowing two keys :


=over 8
=item * type = The message type

=item * payload = The actual information we want to send with the message.
                  It can be a scalar, array ref or hash ref and it cannot
                  contain any objects


B<Example> :

    $MessageBus->send( $message ); #message must be a Net::MessageBus::Message object
    $MessageBus->send( type => 'alert', payload => { a => 1, b => 2 }  );


sub send {
    my $self = shift;
    my $message;
    if (ref($_[0]) eq "Net::MessageBus::Message") {
        $message = $_[0];
    elsif (ref($_[0]) eq "HASH") {
        $message = Net::MessageBus::Message->new({ sender => $self->{sender},
                                           group  => $self->{group},
    else {
        $message = Net::MessageBus::Message->new({ sender => $self->{sender},
                                           group  => $self->{group},
    return $self->send_to_server(message => $message);

=head2 next_message

Returns the next message from the queue of messages we received from the
server. The message is a Net::MessageBus::Message object.
sub next_message {
    my $self = shift;
    my %params = @_;
    if (! scalar(@{$self->{msgqueue}})) {
    return shift @{$self->{msgqueue}};

=head2 pending_messages

Returns all the messages received until now from the server. Each message is
a Net::MessageBus::Message object.

Argumens :

=over 4

=item * force_read_queue = forces a read of everyting the server might have sent 
and we have't processed yet
Note: Forcing a read of the message queue when I<block> mode is on will block the 
call until we received something from the server

sub pending_messages {
    my $self = shift;
    my %params = @_;
    if (! scalar(@{$self->{msgqueue}}) || $params{force_read_queue} ) {
    my @messages = @{$self->{msgqueue}};
    $self->{msgqueue} = [];
    return @messages;

=head2 blocking

Getter/Setter for the I<blocking> setting of the client. If set to true, when waiting for server 
messages, the client will block until it receives something

Examples :
	my $blocking = $MessageBus->blocking();
sub blocking {
	my $self = shift;
	if (defined $_[0]) {
		$self->{blocking} = !!$_[0];
	return $self->{blocking};

=head2 timeout

Getter/Setter for the timeout when waiting for server messages.
It can have subunitary value (eg. 0.01).

I<Note1> : When I<blocking> is set to a true value, the timeout is ignored
I<Note2> : When I<timeout> is set to 0 the effect is the same as setting I<blocking> to a true value.

Example :
	my $timeout = $MessageBus->timeout();
sub timeout {
	my $self = shift;
	if (defined $_[0]) {
		die "Invalid timeout specified" unless $_[0] =~ /^\d+(?:\.\d+)?$/;
		$self->{timeout} = $_[0];
	return $self->{timeout};

=head1 Private methods

B<This methods are for internal use and should not be called manually>

=head2 connect_to_server

Creates a connection to the Net::MessageBus server and authenticates the user


sub connect_to_server {
    my $self = shift;
    $self->{server_socket} = IO::Socket::INET->new(
                                PeerHost => $self->{server_address},
                                PeerPort => $self->{server_port},
                                Proto    => 'tcp',
								Timeout  => 1,
	                            ReuseAddr => 1,
                                Blocking  => 1,
								) || die "Cannot connect to Net::MessageBus server";
    $self->{server_sel} = IO::Select->new($self->{server_socket});
    $self->authenticate() || die "Authentication failed";

=head2 send_to_server

Handles the actual comunication with the server

sub send_to_server {
    my $self = shift;
    my ($type,$object) = @_;
    if (ref($object) eq "Net::MessageBus::Message") {
        $object = $object->serialize();
    local $\ = "\n";
    local $/ = "\n";
    my $socket = $self->{server_socket};
    eval {
        print $socket to_json( {type => $type, payload => $object} );
    if ($@) {
        $self->logger->error("Message could not be sent! : $@");
        return 0;
    my $response = $self->get_response();
    if (! $response->{status}) {
        $self->logger->error('Error received from server: '.$response->{status_message});
        return 0;
    return 1;

=head2 authenticate

Sends a authenication request to the server and waits for the response
sub authenticate {
    my $self = shift;
    return $self->send_to_server('authenticate',
                                        username => $self->{username},
                                        password => $self->{password},

=head2 get_response

Returns the response received from the server for the last request

sub get_response {
    my $self = shift;
    while (! defined $self->{response}) {
    return delete $self->{response};

=head2 read_server_messages

Reads all the messages received from the server and adds the to the internal
message queue

sub read_server_messages {
    my $self = shift;
    local $/ = "\n";
    local $\ = "\n";
    my $timeout = $self->{blocking} ? undef : ($self->{timeout} || 0.01);
    while (1) {

        my @ready = $self->{server_sel}->can_read( $timeout );
        last unless (scalar(@ready));
        my $buffer;
        if ( sysread($ready[0],$buffer,8192) ) {
            $self->{buffer} .= $buffer;
            while ( $self->{buffer} =~ s/(.*?)\n+// ) {
                my $text = $1;
                my $data = from_json($text);
                if (defined $data->{type} && $data->{type} eq 'message') {
                    push @{$self->{msgqueue}}, Net::MessageBus::Message->new($data->{payload});
                    $self->logger->debug('Received : '.$text);
                else {
                    $self->{response} = $data;
        else {
            if ($self->{auto_reconnect}) {
            else {
                die "Net::MessageBus server closed the connection";
        $timeout = 0;

=head1 SEE ALSO

Check out L<Net::MessageBus::Server> which implements the server of the MessageBus and
L<Net::MessageBus::Message> which is the OO inteface for the messages passwed between the client and the server

=head1 AUTHOR

Horea Gligan, C<< <gliganh at> >>

Thanks to Manol Roujinov for helping to improve this module


Copyright 2012 Horea Gligan.

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See for more information.


1; # End of Net::MessageBus