The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use utf8;
use strict;
use warnings;

=head1 NAME

DR::Tarantool::LLClient - a low level async client
for L<Tarantool|http://tarantool.org>

=head1 SYNOPSIS

    DR::Tarantool::LLClient->connect(
        host => '127.0.0.1',
        port => '33033',
        cb   => {
            my ($tnt) = @_;
            ...
        }
    );

    $tnt->ping( sub { .. } );
    $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
    $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], sub { ... });
    $tnt->update(0, [ 1 ], [ [ 1 => add pack 'L<', 1 ] ], sub { ... });
    $tnt->call_lua( 'box.select', [ 0, 1, 2 ], sub { ... });


=head1 DESCRIPTION

This module provides a low-level interface to
L<Tarantool|http://tarantool.org>.

=head1 METHODS

All methods receive B<callback> as the last argument. The callback receives
B<HASHREF> value with the following fields:

=over

=item status

Done status:

=over

=item fatal

A fatal error occurred. The server closed the connection or returned a
broken package.

=item buffer

An internal driver error.

=item error

The request wasn't executed: the server returned an error.

=item ok

Request was executed OK.

=back

=item errstr

If an error occurred, contains error description.

=item code

Contains reply code.

=item req_id

Contains request id.
(see
L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)

=item type

Contains request type
(see
L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)

=item count

Contains the count of returned tuples.

=item tuples

Returned tuples (B<ARRAYREF> of B<ARRAYREF>).

=back

If you use B<NUM> or B<NUM64> field types, values
for these fields need to be packed before they are sent to the
server, and unpacked when received in a response.
This is a low-level driver :)

=cut


package DR::Tarantool::LLClient;
use AnyEvent;
use AnyEvent::Socket;
use Carp;
use Devel::GlobalDestruction;
use File::Spec::Functions 'catfile';
$Carp::Internal{ (__PACKAGE__) }++;

use Scalar::Util 'weaken';
require DR::Tarantool;
use Data::Dumper;
use Time::HiRes ();

my $LE = $] > 5.01 ? '<' : '';


=head2 connect

Creates a connection to L<Tarantool/Box| http://tarantool.org>

    DR::Tarantool::LLClient->connect(
        host => '127.0.0.1',
        port => '33033',
        cb   => {
            my ($tnt) = @_;
            ...
        }
    );

=head3 Arguments

=over

=item host & port

Host and port to connect to.

=item reconnect_period

An interval to wait before trying to reconnect after a fatal error or
unsuccessful connect. If the field is defined and is greater than 0, the
driver tries to reconnect to the server after this interval.

B<Important>: the driver does not reconnect after B<the first>
unsuccessful connection. It calls B<callback> instead.

=item reconnect_always

Try to reconnect even after the first unsuccessful connection.

=item cb

Done callback. The callback receives a connection handle
connected to the server or an error string.

=back

=cut

sub connect {
    my $class = shift;

    my (%opts, $cb);

    if (@_ % 2) {
        $cb = pop;
        %opts = @_;
    } else {
        %opts = @_;
        $cb = delete $opts{cb};
    }

    $class->_check_cb( $cb || sub {  });

    my $host = $opts{host} || 'localhost';
    my $port = $opts{port} or croak "port is undefined";

    my $reconnect_period    = $opts{reconnect_period} || 0;
    my $reconnect_always    = $opts{reconnect_always} || 0;

    my $self = bless {
        host                => $host,
        port                => $port,
        reconnect_period    => $reconnect_period,
        reconnect_always    => $reconnect_always,
        first_connect       => 1,
        connection_status   => 'not_connected',
        wbuf                => '',
        rbuf                => '',
    } => ref($class) || $class;

    $self->_connect_reconnect( $cb );

    return $self;
}

sub disconnect {
    my ($self, $cb) = @_;
    $cb ||= sub {  };
    $self->_check_cb( $cb );

    delete $self->{reconnect_timer};
    delete $self->{connecting};
    if ($self->is_connected) {
        delete $self->{rhandle};
        delete $self->{whandle};
        delete $self->{fh};
    }
    $cb->( 'ok' );
}

sub DESTROY {
    return if in_global_destruction;
    my ($self) = @_;
    if ($self->is_connected) {
        delete $self->{rhandle};
        delete $self->{whandle};
        delete $self->{fh};
    }
}

=head2 is_connected

B<True> if this connection is established.

=cut

sub is_connected {
    my ($self) = @_;
    return $self->fh ? 1 : 0;
}

=head2 connection_status

Contains a string with the status of connection. Return value can be:

=over

=item ok

Connection is established.

=item not_connected

Connection isn't established yet, or was lost.

=item connecting

The driver is connecting to the server.

=item fatal

An attempt to connect was made, but ended up with an error. 
If the event loop is running, and B<reconnect_period> option
is set, the driver continues to try to reconnect and update its status.

=back

=cut

sub connection_status {
    my ($self) = @_;
    $self->{connection_status} || 'unknown';
}


=head2 ping

Ping the server.

    $tnt->ping( sub { .. } );

=head3 Arguments

=over

=item a callback

=back

=cut

sub ping :method {
    my ($self, $cb) = @_;
    $self->_check_cb( $cb );
    my $id = $self->_req_id;
    my $pkt = DR::Tarantool::_pkt_ping( $id );
    $self->_request( $id, $pkt, $cb );
    return;
}


=head2 insert

Insert a tuple.

    $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
    $tnt->insert(0, [ 4, 5, 6 ], $flags, sub { .. });

=head3 Arguments

=over

=item space

=item tuple

=item flags (optional)

=item callback

=back

=cut

sub insert :method {

    my $self = shift;
    $self->_check_number(   my $space = shift       );
    $self->_check_tuple(    my $tuple = shift       );
    $self->_check_cb(       my $cb = pop            );
    $self->_check_number(   my $flags = pop || 0    );
    croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
    $flags ||= 0;

    my $id = $self->_req_id;
    my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
    $self->_request( $id, $pkt, $cb );
    return;
}

=head2 select

Select a tuple or tuples.

    $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], sub { ... });
    $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], 1, sub { ... });
    $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], 1, 2, sub { ... });

=head3 Arguments

=over

=item space

=item index

=item tuple_keys

=item limit (optional)

If the limit isn't set or is zero, select extracts all records without
a limit.

=item offset (optional)

Default value is B<0>.

=item callback for results

=back

=cut

sub select :method {

    my $self = shift;
    $self->_check_number(       my $ns = shift                  );
    $self->_check_number(       my $idx = shift                 );
    $self->_check_tuple_list(   my $keys = shift                );
    $self->_check_cb(           my $cb = pop                    );
    $self->_check_number(       my $limit = shift || 0x7FFFFFFF );
    $self->_check_number(       my $offset = shift || 0         );

    my $id = $self->_req_id;
    my $pkt =
        DR::Tarantool::_pkt_select($id, $ns, $idx, $offset, $limit, $keys);
    $self->_request( $id, $pkt, $cb );
    return;
}

=head2 update

Update a tuple.

    $tnt->update(0, [ 1 ], [ [ 1 => add 1 ] ], sub { ... });
    $tnt->update(
        0,                                      # space
        [ 1 ],                                  # key
        [ [ 1 => add 1 ], [ 2 => add => 1 ],    # operations
        $flags,                                 # flags
        sub { ... }                             # callback
    );
    $tnt->update(0, [ 1 ], [ [ 1 => add 1 ] ], $flags, sub { ... });

=head3 Arguments

=over

=item space

=item tuple_key

=item operations list

=item flags (optional)

=item callback for results

=back

=cut

sub update :method {

    my $self = shift;
    $self->_check_number(           my $ns = shift          );
    $self->_check_tuple(            my $key = shift         );
    $self->_check_operations(       my $operations = shift  );
    $self->_check_cb(               my $cb = pop            );
    $self->_check_number(           my $flags = pop || 0    );

    my $id = $self->_req_id;
    my $pkt = DR::Tarantool::_pkt_update($id, $ns, $flags, $key, $operations);
    $self->_request( $id, $pkt, $cb );
    return;

}

=head2 delete

Delete a tuple.

    $tnt->delete( 0, [ 1 ], sub { ... });
    $tnt->delete( 0, [ 1 ], $flags, sub { ... });

=head3 Arguments

=over

=item space

=item tuple_key

=item flags (optional)

=item callback for results

=back

=cut

sub delete :method {
    my $self = shift;
    my $ns = shift;
    my $key = shift;
    $self->_check_tuple( $key );
    my $cb = pop;
    $self->_check_cb( $cb );
    my $flags = pop || 0;

    my $id = $self->_req_id;
    my $pkt = DR::Tarantool::_pkt_delete($id, $ns, $flags, $key);
    $self->_request( $id, $pkt, $cb );
    return;
}


=head2 call_lua

Calls a lua procedure.

    $tnt->call_lua( 'box.select', [ 0, 1, 2 ], sub { ... });
    $tnt->call_lua( 'box.select', [ 0, 1, 2 ], $flags, sub { ... });

=head3 Arguments

=over

=item name of the procedure

=item tuple_key

=item flags (optional)

=item callback to call when the request is ready

=back

=cut

sub call_lua :method {

    my $self = shift;
    my $proc = shift;
    my $tuple = shift;
    $self->_check_tuple( $tuple );
    my $cb = pop;
    $self->_check_cb( $cb );
    my $flags = pop || 0;

    my $id = $self->_req_id;
    my $pkt = DR::Tarantool::_pkt_call_lua($id, $flags, $proc, $tuple);
    $self->_request( $id, $pkt, $cb );
    return;
}


=head2 last_code

Return code of the last request or B<undef> if there was no
request. 

=cut

sub last_code {
    my ($self) = @_;
    return $self->{last_code} if exists $self->{last_code};
    return undef;
}


=head2 last_error_string

An error string if the last request ended up with an 
error, or B<undef> otherwise.

=cut

sub last_error_string {
    my ($self) = @_;
    return $self->{last_error_string} if exists $self->{last_error_string};
    return undef;
}

=head1 Logging

The module can log requests/responses. Logging can be turned ON by 
setting these environment variables:

=over

=item TNT_LOG_DIR

Instructs LLClient to record all requests/responses into this directory.

=item TNT_LOG_ERRDIR

Instructs LLClient to record all requests/responses which
ended up with an error into this directory.

=back

=cut


sub _log_transaction {
    my ($self, $id, $pkt, $response, $res_pkt) = @_;

    my $logdir = $ENV{TNT_LOG_DIR};
    goto DOLOG if $logdir;
    $logdir = $ENV{TNT_LOG_ERRDIR};
    goto DOLOG if $logdir and $response->{status} ne 'ok';
    return;

    DOLOG:
    eval {
        die "Directory $logdir was not found, transaction wasn't logged\n"
            unless -d $logdir;

        my $now = Time::HiRes::time;

        my $logdirname = catfile $logdir,
            sprintf '%s-%s', $now, $response->{status};

        die "Object $logdirname is already exists, transaction wasn't logged\n"
            if -e $logdirname or -d $logdirname;
        
        die $! unless mkdir $logdirname;
       
        my $rrname = catfile $logdirname, 
            sprintf 'rawrequest-%04d.bin', $id;
        open my $fh, '>:raw', $rrname or die "Can't open $rrname: $!\n";
        print $fh $pkt;
        close $fh;

        my $respname = catfile $logdirname,
            sprintf 'dumpresponse-%04d.txt', $id;

        open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
        
        local $Data::Dumper::Indent = 1;
        local $Data::Dumper::Terse = 1;
        local $Data::Dumper::Useqq = 1;
        local $Data::Dumper::Deepcopy = 1;
        local $Data::Dumper::Maxdepth = 0;
        print $fh Dumper($response);
        close $fh;

        if (defined $res_pkt) {
            $respname = catfile $logdirname,
                sprintf 'rawresponse-%04d.bin', $id;
            open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
            print $fh $res_pkt;
            close $fh;
        }
    };
    warn $@ if $@;
}


sub _request {
    my ($self, $id, $pkt, $cb ) = @_;
    
    my $cbres = $cb;
    $cbres = sub { $self->_log_transaction($id, $pkt, @_); &$cb }
        if $ENV{TNT_LOG_ERRDIR} or $ENV{TNT_LOG_DIR};

    $self->{ wait }{ $id } = $cbres;
    # TODO: use watcher
    $self->{wbuf} .= $pkt;

    if ($self->fh) {
        return if $self->{whandle};
        $self->{whandle} = AE::io $self->fh, 1, $self->_on_write;
    }
}

sub _on_write {
    my ($self) = @_;
    sub {
        my $wb = syswrite $self->fh, $self->{wbuf};
        unless(defined $wb) {
            $self->_socket_error->($self->fh, 1, $!);
            return;
        }
        return unless $wb;
        substr $self->{wbuf}, 0, $wb, '';
        delete $self->{whandle} unless length $self->{wbuf};
    }
}
sub _req_id {
    my ($self) = @_;
    for (my $id = $self->{req_id} || 0;; $id++) {
        $id = 0 unless $id < 0x7FFF_FFFF;
        next if exists $self->{wait}{$id};
        $self->{req_id} = $id + 1;
        return $id;
    }
}

sub _fatal_error {
    my ($self, $msg, $raw) = @_;
    $self->{last_code} ||= -1;
    $self->{last_error_string} ||= $msg;

    delete $self->{rhandle};
    delete $self->{whandle};
    delete $self->{fh};
    $self->{wbuf} = '';
    $self->{connection_status} = 'not_connected',

    my $wait = delete $self->{wait};
    $self->{wait} = {};
    for (keys %$wait) {
        my $cb = delete $wait->{$_};
        $cb->({ status  => 'fatal',  errstr  => $msg, req_id => $_ }, $raw);
    }

    $self->_connect_reconnect;
}

sub _connect_reconnect {

    my ($self, $cb) = @_;

    $self->_check_cb( $cb ) if $cb;
    return if $self->{reconnect_timer};
    return if $self->{connecting};
    return unless $self->{first_connect} or $self->{reconnect_period};

    $self->{reconnect_timer} = AE::timer
        $self->{first_connect} ? 0 : $self->{reconnect_period},
        $self->{reconnect_period} || 0,
        sub {
            return if $self->{connecting};
            $self->{connecting} = 1;

            $self->{connection_status} = 'connecting';

            tcp_connect $self->{host}, $self->{port}, sub {
                my ($fh) = @_;
                delete $self->{connecting};
                if ($fh) {
                    $self->{fh} = $fh;
                    $self->{rbuf} = '';
                    $self->{rhandle} = AE::io $self->fh, 0, $self->on_read;
                    $self->{whandle} = AE::io $self->fh, 1, $self->_on_write
                        if length $self->{wbuf};


                    delete $self->{reconnect_timer};
                    delete $self->{first_connect};
                    $self->{connection_status} = 'ok';

                    $cb->( $self ) if $cb;
                    return;
                }

                my $emsg = $!;
                if ($self->{first_connect} and not $self->{reconnect_always}) {
                    $cb->( $! ) if $cb;
                    delete $self->{reconnect_timer};
                    delete $self->{connecting};
                }
                $self->{connection_status} = "Couldn't connect to server: $!";
            };
        }
    ;
}

sub _check_rbuf {{
    my ($self) = @_;
    return unless length $self->{rbuf} >= 12;
    my (undef, $blen) = unpack "L$LE L$LE", $self->{rbuf};
    return unless length $self->{rbuf} >= 12 + $blen;
    

    my $pkt = substr $self->{rbuf}, 0, 12 + $blen, '';

    my $res = DR::Tarantool::_pkt_parse_response( $pkt );

    $self->{last_code} = $res->{code};
    if (exists $res->{errstr}) {
        $self->{last_error_string} = $res->{errstr};
    } else {
        delete $self->{last_error_string};
    }

    if ($res->{status} =~ /^(fatal|buffer)$/) {
        $self->_fatal_error( $res->{errstr}, $pkt );
        return;
    }

    my $id = $res->{req_id};
    my $cb = delete $self->{ wait }{ $id };
    if ('CODE' eq ref $cb) {
        $cb->( $res, $pkt );
    } else {
        warn "Unexpected reply from tarantool with id = $id";
    }
    redo;
}}


sub on_read {
    my ($self) = @_;
    sub {
        my $rd = sysread $self->fh, my $buf, 4096;
        unless(defined $rd) {
            $self->_socket_error->($self->fh, 1, $!);
            return;
        }

        unless($rd) {
            $self->_socket_eof->($self->fh, 1);
            return;
        }
        $self->{rbuf} .= $buf;
        $self->_check_rbuf;
    }
        # write responses as binfile for tests
#         {
#             my ($type, $blen, $id, $code, $body) =
#                 unpack 'L< L< L< L< A*', $hdr . $data;

#             my $sname = sprintf 't/test-data/%05d-%03d-%s.bin',
#                 $type || 0, $code, $code ? 'fail' : 'ok';
#             open my $fh, '>:raw', $sname;
#             print $fh $hdr;
#             print $fh $data;
#             warn "$sname saved (body length: $blen)";
#         }
}


sub fh {
    my ($self) = @_;
    return $self->{fh};
}

sub _socket_error {
    my ($self) = @_;
    return sub {
        my (undef, $fatal, $msg) = @_;
        $self->_fatal_error("Socket error: $msg");
    }
}

sub _socket_eof {
    my ($self) = @_;
    return sub {
        $self->_fatal_error("Socket error: Server closed connection");
    }
}


sub _check_cb {
    my ($self, $cb) = @_;
    croak 'Callback must be CODEREF' unless 'CODE' eq ref $cb;
}

sub _check_tuple {
    my ($self, $tuple) = @_;
    croak 'Tuple must be ARRAYREF' unless 'ARRAY' eq ref $tuple;
}

sub _check_tuple_list {
    my ($self, $list) = @_;
    croak 'Tuplelist must be ARRAYREF of ARRAYREF' unless 'ARRAY' eq ref $list;
    croak 'Tuplelist is empty' unless @$list;
    $self->_check_tuple($_) for @$list;
}

sub _check_number {
    my ($self, $number) = @_;
    croak "argument must be number"
        unless defined $number and $number =~ /^\d+$/;
}


sub _check_operation {
    my ($self, $op) = @_;
    croak 'Operation must be ARRAYREF' unless 'ARRAY' eq ref $op;
    croak 'Wrong update operation: too short arglist' unless @$op >= 2;
    croak "Wrong operation: $op->[1]"
        unless $op->[1] and
            $op->[1] =~ /^(delete|set|insert|add|and|or|xor|substr)$/;
    $self->_check_number($op->[0]);
}

sub _check_operations {
    my ($self, $list) = @_;
    croak 'Operations list must be ARRAYREF of ARRAYREF'
        unless 'ARRAY' eq ref $list;
    croak 'Operations list is empty' unless @$list;
    $self->_check_operation( $_ ) for @$list;
}

=head1 COPYRIGHT AND LICENSE

 Copyright (C) 2011 Dmitry E. Oboukhov <unera@debian.org>
 Copyright (C) 2011 Roman V. Nikolaev <rshadow@rambler.ru>

 This program is free software, you can redistribute it and/or
 modify it under the terms of the Artistic License.

=head1 VCS

The project is placed git repo on github:
L<https://github.com/unera/dr-tarantool/>.

=cut

1;