The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use utf8;
use strict;
use warnings;

package DR::Tarantool::MsgPack::Proto;
use DR::Tarantool::MsgPack qw(msgpack msgunpack msgcheck);
use base qw(Exporter);
our @EXPORT_OK = qw(call_lua response insert replace del update select auth handshake ping);
use Carp;
use Scalar::Util 'looks_like_number';
use Digest::SHA 'sha1';
use MIME::Base64;

our $DECODE_UTF8    = 1;

my (%resolve, %tresolve);

my %iter = (
    EQ                  => 0,
    REQ                 => 1,
    ALL                 => 2,
    LT                  => 3,
    LE                  => 4,
    GE                  => 5,
    GT                  => 6,
    BITS_ALL_SET        => 7,
    BITS_ANY_SET        => 8,
    BITS_ALL_NOT_SET    => 9
);

my %riter = reverse %iter;

BEGIN {
    my %types = (
        IPROTO_SELECT              => 1,
        IPROTO_INSERT              => 2,
        IPROTO_REPLACE             => 3,
        IPROTO_UPDATE              => 4,
        IPROTO_DELETE              => 5,
        IPROTO_CALL                => 6,
        IPROTO_AUTH                => 7,
        IPROTO_DML_REQUEST_MAX     => 8,
        IPROTO_PING                => 64,
        IPROTO_SUBSCRIBE           => 66,
    );
    my %attrs = (
        IPROTO_CODE                => 0x00,
        IPROTO_SYNC                => 0x01,
        IPROTO_SERVER_ID           => 0x02,
        IPROTO_LSN                 => 0x03,
        IPROTO_TIMESTAMP           => 0x04,
        IPROTO_SPACE_ID            => 0x10,
        IPROTO_INDEX_ID            => 0x11,
        IPROTO_LIMIT               => 0x12,
        IPROTO_OFFSET              => 0x13,
        IPROTO_ITERATOR            => 0x14,
        IPROTO_KEY                 => 0x20,
        IPROTO_TUPLE               => 0x21,
        IPROTO_FUNCTION_NAME       => 0x22,
        IPROTO_USER_NAME           => 0x23,
        IPROTO_DATA                => 0x30,
        IPROTO_ERROR               => 0x31,
    );

    use constant;
    while (my ($n, $v) = each %types) {
        constant->import($n => $v);
        $n =~ s/^IPROTO_//;
        $tresolve{$v} = $n;
    }
    while (my ($n, $v) = each %attrs) {
        constant->import($n => $v);
        $n =~ s/^IPROTO_//;
        $resolve{$v} = $n;
    }
}



sub raw_response($) {
    my ($response) = @_;

    my $len;
    {
        return unless defined $response;
        my $lenheader = length $response > 10 ?
            substr $response, 0, 10 : $response;
        return unless my $lenlen = msgcheck($lenheader);

        $len = msgunpack $lenheader, $DECODE_UTF8;
        croak 'Unexpected msgpack object ' . ref($len) if ref $len;
        $len += $lenlen;
    }


    return if length $response < $len;

    my @r;
    my $off = 0;

    for (1 .. 3) {
        my $sp = $off ? substr $response, $off : $response;
        my $len_item = msgcheck $sp;
        croak 'Broken response'
            unless $len_item and $len_item + $off <= length $response;
        push @r => msgunpack $sp, $DECODE_UTF8;
        $off += $len_item;

        if ($_ eq 2 and $off == length $response) {
            push @r => {};
            last;
        }
    }

    croak 'Broken response header' unless 'HASH' eq ref $r[1];
    croak 'Broken response body' unless 'HASH' eq ref $r[2];

    return [ $r[1], $r[2] ], substr $response, $off;
}

sub response($) {

    my ($resp, $tail) = raw_response($_[0]);
    return unless $resp;
    my ($h, $b) = @$resp;

    my $res = {};

    while(my ($k, $v) = each %$h) {
        my $name = $resolve{$k};
        $name = $k unless defined $name;
        $res->{$name} = $v;
    }
    while(my ($k, $v) = each %$b) {
        my $name = $resolve{$k};
        $name = $k unless defined $name;
        $res->{$name} = $v;
    }

    if (defined $res->{CODE}) {
        my $n = $tresolve{ $res->{CODE} };
        $res->{CODE} = $n if defined $n;
    }

    if (defined $res->{ITERATOR}) {
        my $n = $riter{ $res->{ITERATOR} };
        $res->{ITERATOR} = $n if defined $n;
    }

    return $res, $tail;
    
}

sub request($$) {
    my ($header, $body) = @_;
    my $pkt = msgpack($header) . msgpack($body);
    return msgpack(length $pkt) . $pkt;
}

sub _call_lua($$$) {
    my ($sync, $proc, $tuple) = @_;
    request
        {
            IPROTO_SYNC,            $sync,
            IPROTO_CODE,            IPROTO_CALL,
        },
        {
            IPROTO_FUNCTION_NAME,   $proc,
            IPROTO_TUPLE,           $tuple,
        }
    ;
}

sub call_lua($$@) {
    my ($sync, $proc, @args) = @_;
    return _call_lua($sync, $proc, \@args);
}

sub insert($$$) {
    my ($sync, $space, $tuple) = @_;

    $tuple = [ $tuple ] unless ref $tuple;
    croak "Cant convert HashRef to tuple" if 'HASH' eq ref $tuple;

    if (looks_like_number $space) {
        return request
            {
                IPROTO_SYNC,        $sync,
                IPROTO_CODE,        IPROTO_INSERT,
            },
            {
                IPROTO_SPACE_ID,    $space,
                IPROTO_TUPLE,       $tuple,
            }
        ;
    }

    # HACK
    _call_lua($sync, "box.space.$space:insert", $tuple);
}

sub replace($$$) {
    my ($sync, $space, $tuple) = @_;

    $tuple = [ $tuple ] unless ref $tuple;
    croak "Cant convert HashRef to tuple" if 'HASH' eq ref $tuple;

    if (looks_like_number $space) {
        return request
            {
                IPROTO_SYNC,        $sync,
                IPROTO_CODE,        IPROTO_REPLACE,
            },
            {
                IPROTO_SPACE_ID,    $space,
                IPROTO_TUPLE,       $tuple,
            }
        ;
    }
    # HACK
    _call_lua($sync, "box.space.$space:replace", $tuple);
}
sub del($$$) {
    my ($sync, $space, $key) = @_;

    $key = [ $key ] unless ref $key;
    croak "Cant convert HashRef to key" if 'HASH' eq ref $key;

    if (looks_like_number $space) {
        return request
            {
                IPROTO_SYNC,        $sync,
                IPROTO_CODE,        IPROTO_DELETE,
            },
            {
                IPROTO_SPACE_ID,    $space,
                IPROTO_KEY,         $key,
            }
        ;
    }
    # HACK
    _call_lua($sync, "box.space.$space:delete", $key);
}


sub update($$$$) {
    my ($sync, $space, $key, $ops) = @_;
    croak 'Oplist must be Arrayref' unless 'ARRAY' eq ref $ops;
    $key = [ $key ] unless ref $key;
    croak "Cant convert HashRef to key" if 'HASH' eq ref $key;

    if (looks_like_number $space) {
        return request
            {
                IPROTO_SYNC,        $sync,
                IPROTO_CODE,        IPROTO_UPDATE,
            },
            {
                IPROTO_SPACE_ID,    $space,
                IPROTO_KEY,         $key,
                IPROTO_TUPLE,       $ops,
            }
        ;
    }
    # HACK
    _call_lua($sync, "box.space.$space:update", [ $key, $ops ]);
}

sub select($$$$;$$$) {
    my ($sync, $space, $index, $key, $limit, $offset, $iterator) = @_;
    $iterator = 'EQ' unless defined $iterator;
    $offset ||= 0;
    $limit  = 0xFFFF_FFFF unless defined $limit;
    $key = [ $key ] unless ref $key;
    croak "Cant convert HashRef to key" if 'HASH' eq ref $key;

    unless(looks_like_number $iterator) {
        my $i = $iter{$iterator};
        croak "Wrong iterator type: $iterator" unless defined $i;
        $iterator = $i;
    }

    if (looks_like_number $space and looks_like_number $index) {
        return request
            {
                IPROTO_SYNC,        $sync,
                IPROTO_CODE,        IPROTO_SELECT,
            },
            {
                IPROTO_KEY,         $key,
                IPROTO_SPACE_ID,    $space,
                IPROTO_OFFSET,      $offset,
                IPROTO_INDEX_ID,    $index,
                IPROTO_LIMIT,       $limit,
                IPROTO_ITERATOR,    $iterator,
            }
        ;
    }

    # HACK
    _call_lua($sync, "box.space.$space.index.$index:select", [
                $key,
                {
                    offset => $offset,
                    limit => $limit,
                    iterator => $iterator
                } 
            ]
    );
}

sub ping($) {
    my ($sync) = @_;
    request
        {
            IPROTO_SYNC,    $sync,
            IPROTO_CODE,    IPROTO_PING,
        },
        {
        }
    ;
}


sub strxor($$) {
    my ($x, $y) = @_;

    my @x = unpack 'C*', $x;
    my @y = unpack 'C*', $y;
    $x[$_] ^= $y[$_] for 0 .. $#x;
    return pack 'C*', @x;
}

sub auth($$$$) {
    my ($sync, $user, $password, $salt) = @_;

    my $hpasswd = sha1 $password;
    my $hhpasswd = sha1 $hpasswd;
    my $scramble = sha1 $salt . $hhpasswd;


    my $hash = strxor $hpasswd, $scramble;
    request
        {
            IPROTO_SYNC, $sync,
            IPROTO_CODE, IPROTO_AUTH,
        },
        {
            IPROTO_USER_NAME,   $user,
            IPROTO_TUPLE,       [ 'chap-sha1', $hash ],
        }
    ;
}

sub handshake($) {
    my ($h) = @_;
    croak 'Wrong handshake length' unless length $h == 128;
    my $version = substr $h, 0, 64;
    my $salt =    substr MIME::Base64::decode_base64(substr $h, 64), 0, 20;

    for ($version) {
        s/\0.*//;
        s/^tarantool:?\s*//i;
    }
    return $version, $salt;
}

1;