The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package AnyEvent::MPRPC::Client;
use strict;
use warnings;
use Any::Moose;

use Carp;
use Scalar::Util 'weaken';

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::MessagePack;
use AnyEvent::MPRPC::Constant;

has host => (
    is       => 'ro',
    isa      => 'Str',
    required => 1,
);

has port => (
    is       => 'ro',
    isa      => 'Int|Str',
    required => 1,
);

has connect_timeout => (
    is       => 'ro',
    isa      => 'Int|Str',
);

has handler => (
    is  => 'rw',
    isa => 'AnyEvent::Handle',
);

has on_error => (
    is      => 'rw',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        return sub {
            my ($handle, $fatal, $message) = @_;
            croak sprintf "Client got error: %s", $message;
        };
    },
);

has handler_options => (
    is      => 'ro',
    isa     => 'HashRef',
    default => sub { {} },
);

has _request_pool => (
    is      => 'ro',
    isa     => 'ArrayRef',
    lazy    => 1,
    default => sub { [] },
);

has _next_id => (
    is      => 'ro',
    isa     => 'CodeRef',
    lazy    => 1,
    default => sub {
        my $id = 0;
        sub { ++$id };
    },
);

has _callbacks => (
    is      => 'ro',
    isa     => 'HashRef',
    lazy    => 1,
    default => sub { +{} },
);

has _connection_guard => (
    is  => 'rw',
    isa => 'Object',
);

has 'before_connect' => (
    is  => 'ro',
    isa => 'CodeRef',
);

has 'after_connect' => (
    is  => 'ro',
    isa => 'CodeRef',
);

# depreciated!
has 'on_connect' => (
    is  => 'ro',
    isa => 'CodeRef',
);

no Any::Moose;

sub BUILD {
    my $self = shift;

    my $after_connect = $self->after_connect ? sub { $self->after_connect->($self, @_) } : undef;
    my $guard = tcp_connect $self->host, $self->port, sub {
        my $fh = shift
            or return
                $self->on_error->(
                    undef, 1,
                    "Failed to connect $self->{host}:$self->{port}: $!",
                );
        my($host, $port, $retry) = @_;
        $self->after_connect
            and $self->after_connect->($self, $fh, $host, $port, $retry);

        my $handle = AnyEvent::Handle->new(
            on_error => sub {
                my ($h, $fatal, $msg) = @_;
                $self->on_error->(@_);
                $h->destroy;
            },
            %{ $self->handler_options },
            fh => $fh,
        );

        $handle->unshift_read(msgpack => $self->_handle_response_cb);

        while (my $pooled = shift @{ $self->_request_pool }) {
            $handle->push_write( msgpack => $pooled );
        }

        $self->handler( $handle );
    }, sub {
        my $connect_timeout;

        $self->before_connect
            and $self->before_connect->($self, @_);

        # on_conect is depreciated!
        $self->on_connect
            and $connect_timeout = $self->on_connect->($self, @_);

        # For backward compatibility, if connect_timeout option isn't specifed
        # use return value of on_connect callback as connect timeout seconds.
        $self->connect_timeout
            and $connect_timeout = $self->connect_timeout;

        return $connect_timeout;
    };
    weaken $self;

    $self->_connection_guard($guard);
}

sub call {
    my ($self, $method) = (shift, shift);
    my $param = (@_ == 1 && ref $_[0] eq "ARRAY") ? $_[0] : [@_];

    my $msgid = $self->_next_id->();

    my $request = [
        MP_TYPE_REQUEST,
        int($msgid), # should be IV
        $method,
        $param,
    ];

    if ($self->handler) {
        $self->handler->push_write( msgpack => $request );
    }
    else {
        push @{ $self->_request_pool }, $request;
    }

    # $msgid is stringified, but $request->{MP_RES_MSGID] is still IV
    $self->_callbacks->{ $msgid } = AnyEvent->condvar;
}

sub _handle_response_cb {
    my $self = shift;

    weaken $self;

    return sub {
        $self || return;

        my ($handle, $res) = @_;

        my $d = delete $self->_callbacks->{ $res->[MP_RES_MSGID] };

        if (my $error = $res->[MP_RES_ERROR]) {
            if ($d) {
                $d->croak($error);
            } else {
                Carp::croak($error);
            }
        }

        $handle->unshift_read(msgpack => $self->_handle_response_cb);

        if ($d) {
            $d->send($res->[MP_RES_RESULT]);
        } else {
            warn q/Invalid response from server/;
            return;
        }
    };
}

__PACKAGE__->meta->make_immutable;

__END__

=head1 NAME

AnyEvent::MPRPC::Client - Simple TCP-based MessagePack RPC client

=head1 SYNOPSIS

    use AnyEvent::MPRPC::Client;

    my $client = AnyEvent::MPRPC::Client->new(
        host => '127.0.0.1',
        port => 4423,
    );

    # blocking interface
    my $res = $client->call( echo => 'foo bar' )->recv; # => 'foo bar';

    # non-blocking interface
    $client->call( echo => 'foo bar' )->cb(sub {
        my $res = $_[0]->recv;  # => 'foo bar';
    });

=head1 DESCRIPTION

This module is client part of L<AnyEvent::MPRPC>.

=head2 AnyEvent condvars

The main thing you have to remember is that all the data retrieval methods
return an AnyEvent condvar, C<$cv>.  If you want the actual data from the
request, there are a few things you can do.

You may have noticed that many of the examples in the SYNOPSIS call C<recv>
on the condvar.  You're allowed to do this under 2 circumstances:

=over 4

=item Either you're in a main program,

Main programs are "allowed to call C<recv> blockingly", according to the
author of L<AnyEvent>.

=item or you're in a Coro + AnyEvent environment.

When you call C<recv> inside a coroutine, only that coroutine is blocked
while other coroutines remain active.  Thus, the program as a whole is
still responsive.

=back

If you're not using Coro, and you don't want your whole program to block,
what you should do is call C<cb> on the condvar, and give it a coderef to
execute when the results come back.  The coderef will be given a condvar
as a parameter, and it can call C<recv> on it to get the data.  The final
example in the SYNOPSIS gives a brief example of this.

Also note that C<recv> will throw an exception if the request fails, so be
prepared to catch exceptions where appropriate.

Please read the L<AnyEvent> documentation for more information on the proper
use of condvars.

=head1 METHODS

=head2 new (%options)

Create new client object and return it.

    my $client = AnyEvent::MRPPC::Client->new(
        host => '127.0.0.1',
        port => 4423,
        %options,
    );

Available options are:

=over 4

=item host => 'Str'

Hostname to connect. (Required)

You should set this option to "unix/" if you will set unix socket to port option.

=item port => 'Int | Str'

Port number or unix socket path to connect. (Required)

=item on_error => $cb->($handle, $fatal, $message)

Error callback code reference, which is called when some error occured.
This has same arguments as L<AnyEvent::Handle>, and also act as handler's on_error callback.

Default is just croak.

=item before_connect => $cb->($self, $filehandle)

It will be called with the file handle in not-yet-connected state as only argument.

=item after_connect => $cb->($self, $filehandle, $host, $port, $retry)

After the connection is established, then this callback will be invoked.

If the connect is unsuccessful, then the on_error callback will be invoked.

=item on_connect => $cb->($self, $filehandle)

It will be called with the file handle in not-yet-connected state as only argument.

    *******************************************************************
     The on_connect callback is deprecated! Please use before_connect
     (same as $prepare_cb of AnyEvent::Socket#tcp_connect) or
     after_connect (which call in $connect_cb of
     AnyEvent::Socket#tcp_connect).
    *******************************************************************

=item handler_options => 'HashRef'

This is passed to constructor of L<AnyEvent::Handle> that is used manage connection.

Default is empty.

=back

=head2 call ($method, (@params | \@params))

Call remote method named C<$method> with parameters C<@params>. And return condvar object for response.

    my $cv = $client->call( echo => 'Hello!' );
    my $res = $cv->recv;

If server returns an error, C<<$cv->recv>> causes croak by using C<<$cv->croak>>. So you can handle this like following:

    my $res;
    eval { $res = $cv->recv };

    if (my $error = $@) {
        # ...
    }

=head1 AUTHOR

Tokuhiro Matsuno <tokuhirom@cpan.org>

=head1 COPYRIGHT AND LICENSE

Copyright (c) 2009 by tokuhirom.

This program is free software; you can redistribute
it and/or modify it under the same terms as Perl itself.

The full text of the license can be found in the
LICENSE file included with this module.

=cut