use utf8;
use strict;
use warnings;
package DR::Tarantool::LLSyncClient;
use Carp;
use IO::Socket::UNIX;
use IO::Socket::INET;
require DR::Tarantool;
my $LE = $] > 5.01 ? '<' : '';
$Carp::Internal{ (__PACKAGE__) }++;
sub connect {
my ($class, %opts) = @_;
my $host = $opts{host} || 'localhost';
my $port = $opts{port} or croak 'port is undefined';
my $reconnect_period = $opts{reconnect_period} || 0;
my $reconnect_always = $opts{reconnect_always} || 0;
my $raise_error = 1;
if (exists $opts{raise_error}) {
$raise_error = $opts{raise_error} ? 1 : 0;
}
my $self = bless {
host => $host,
port => $port,
raise_error => $raise_error,
reconnect_period => $reconnect_period,
id => 0,
} => ref ($class) || $class;
unless ($self->_connect()) {
unless ($reconnect_always) {
return undef unless $self->{raise_error};
croak "Can't connect to $self->{host}:$self->{port}: $@";
}
unless ($reconnect_period) {
return undef unless $self->{raise_error};
croak "Can't connect to $self->{host}:$self->{port}: $@";
}
}
return $self;
}
sub _connect {
my ($self) = @_;
if ($self->{host} eq 'unix/' or $self->{port} =~ /\D/) {
return $self->{fh} = IO::Socket::UNIX->new(Peer => $self->{port});
} else {
return $self->{fh} = IO::Socket::INET->new(
PeerHost => $self->{host},
PeerPort => $self->{port},
Proto => 'tcp',
);
}
}
sub _req_id {
my ($self) = @_;
return $self->{id}++ if $self->{id} < 0x7FFF_FFFE;
return $self->{id} = 0;
}
sub _request {
my ($self, $id, $pkt ) = @_;
until($self->{fh}) {
unless ($self->{reconnect_period}) {
$self->{last_error_string} = "Connection isn't established";
croak $self->{last_error_string} if $self->{raise_error};
return undef;
}
next if $self->_connect;
sleep $self->{reconnect_period};
}
my $len = length $pkt;
# send request
while($len > 0) {
no warnings; # closed socket
my $slen = syswrite $self->{fh}, $pkt;
goto SOCKET_ERROR unless defined $slen;
$len -= $slen;
substr $pkt, 0, $slen, '';
}
$pkt = '';
while(12 > length $pkt) {
no warnings; # closed socket
my $rl = sysread $self->{fh}, $pkt, 12 - length $pkt, length $pkt;
goto SOCKET_ERROR unless defined $rl;
}
my (undef, $blen) = unpack "L$LE L$LE", $pkt;
while(12 + $blen > length $pkt) {
no warnings; # closed socket
my $rl = sysread $self->{fh},
$pkt, 12 + $blen - length $pkt, length $pkt;
goto SOCKET_ERROR unless defined $rl;
}
my $res = DR::Tarantool::_pkt_parse_response( $pkt );
if ($res->{status} ne 'ok') {
$self->{last_error_string} = $res->{errstr};
$self->{last_code} = $res->{code};
# disconnect
delete $self->{fh} if $res->{status} =~ /^(fatal|buffer)$/;
croak $self->{last_error_string} if $self->{raise_error};
return undef;
}
$self->{last_error_string} = $res->{errstr} || '';
$self->{last_code} = $res->{code};
return $res;
SOCKET_ERROR:
delete $self->{fh};
$self->{last_error_string} = $!;
$self->{last_code} = undef;
croak $self->{last_error_string} if $self->{raise_error};
return undef;
}
sub ping :method {
my ($self) = @_;
unless ($self->{fh}) {
$self->_connect;
$self->{last_code} = -1;
$self->{last_error_string} = "Connection isn't established";
return 0 unless $self->{fh};
}
my $id = $self->_req_id;
my $pkt = DR::Tarantool::_pkt_ping( $id );
my $res = eval { $self->_request( $id, $pkt ); };
return 0 unless $res and $res->{status} eq 'ok';
return 1;
}
sub call_lua :method {
my $self = shift;
my $proc = shift;
my $tuple = shift;
$self->_check_tuple( $tuple );
my $flags = pop || 0;
my $id = $self->_req_id;
my $pkt = DR::Tarantool::_pkt_call_lua($id, $flags, $proc, $tuple);
return $self->_request( $id, $pkt );
}
sub select :method {
my $self = shift;
$self->_check_number( my $ns = shift );
$self->_check_number( my $idx = shift );
$self->_check_tuple_list( my $keys = shift );
$self->_check_number( my $limit = shift || 0x7FFFFFFF );
$self->_check_number( my $offset = shift || 0 );
my $id = $self->_req_id;
my $pkt =
DR::Tarantool::_pkt_select($id, $ns, $idx, $offset, $limit, $keys);
return $self->_request( $id, $pkt );
}
sub insert :method {
my $self = shift;
$self->_check_number( my $space = shift );
$self->_check_tuple( my $tuple = shift );
$self->_check_number( my $flags = pop || 0 );
croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
$flags ||= 0;
my $id = $self->_req_id;
my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
return $self->_request( $id, $pkt );
}
sub update :method {
my $self = shift;
$self->_check_number( my $ns = shift );
$self->_check_tuple( my $key = shift );
$self->_check_operations( my $operations = shift );
$self->_check_number( my $flags = pop || 0 );
my $id = $self->_req_id;
my $pkt = DR::Tarantool::_pkt_update($id, $ns, $flags, $key, $operations);
return $self->_request( $id, $pkt );
}
sub delete :method {
my $self = shift;
my $ns = shift;
my $key = shift;
$self->_check_tuple( $key );
my $flags = pop || 0;
my $id = $self->_req_id;
my $pkt = DR::Tarantool::_pkt_delete($id, $ns, $flags, $key);
return $self->_request( $id, $pkt );
}
sub _check_tuple {
my ($self, $tuple) = @_;
croak 'Tuple must be ARRAYREF' unless 'ARRAY' eq ref $tuple;
}
sub _check_tuple_list {
my ($self, $list) = @_;
croak 'Tuplelist must be ARRAYREF of ARRAYREF' unless 'ARRAY' eq ref $list;
croak 'Tuplelist is empty' unless @$list;
$self->_check_tuple($_) for @$list;
}
sub _check_number {
my ($self, $number) = @_;
croak "argument must be number"
unless defined $number and $number =~ /^\d+$/;
}
sub _check_operation {
my ($self, $op) = @_;
croak 'Operation must be ARRAYREF' unless 'ARRAY' eq ref $op;
croak 'Wrong update operation: too short arglist' unless @$op >= 2;
croak "Wrong operation: $op->[1]"
unless $op->[1] and
$op->[1] =~ /^(delete|set|insert|add|and|or|xor|substr)$/;
$self->_check_number($op->[0]);
}
sub _check_operations {
my ($self, $list) = @_;
croak 'Operations list must be ARRAYREF of ARRAYREF'
unless 'ARRAY' eq ref $list;
croak 'Operations list is empty' unless @$list;
$self->_check_operation( $_ ) for @$list;
}
sub last_error_string {
return $_[0]->{last_error_string};
}
sub last_code {
return $_[0]->{last_code};
}
sub raise_error {
return $_[0]->{raise_error};
}
1;