# * Statistics::RserveClient::Connection
# * Supports Rserve protocol 0103 only (used by Rserve 0.5 and higher)
# * Based on rserve-php by Clément Turbelin
#
# * @author Djun Kim
# * Licensed under# GPL v2 or at your option v3
# Handle Connection and communicating with Rserve instance
# @author Djun Kim
#use warnings;
#use diagnostics;
#use autodie;
package Statistics::RserveClient::Connection;
use Statistics::RserveClient;
use Data::Dumper;
use Exporter;
our @EXPORT = qw ( new init close evalString evalStringToFile DT_LARGE );
use Socket;
use Statistics::RserveClient::Funclib qw( _rserve_make_packet );
use Statistics::RserveClient::Parser qw( parse );
use Statistics::RserveClient::Exception;
use Statistics::RserveClient::ParserException;
use constant PARSER_NATIVE => 0;
use constant PARSER_REXP => 1;
use constant PARSER_DEBUG => 2;
use constant PARSER_NATIVE_WRAPPED => 3;
use constant DT_INT => 1;
use constant DT_CHAR => 2;
use constant DT_DOUBLE => 3;
use constant DT_STRING => 4;
use constant DT_BYTESTREAM => 5;
use constant DT_SEXP => 10;
use constant DT_ARRAY => 11;
# this is a flag saying that the contents is large (>0xfffff0) and
# hence uses 56-bit length field
use constant DT_LARGE => 64;
use constant CMD_login => 0x001;
use constant CMD_voidEval => 0x002;
use constant CMD_eval => 0x003;
use constant CMD_shutdown => 0x004;
use constant CMD_openFile => 0x010;
use constant CMD_createFile => 0x011;
use constant CMD_closeFile => 0x012;
use constant CMD_readFile => 0x013;
use constant CMD_writeFile => 0x014;
use constant CMD_removeFile => 0x015;
use constant CMD_setSEXP => 0x020;
use constant CMD_assignSEXP => 0x021;
use constant CMD_setBufferSize => 0x081;
use constant CMD_setEncoding => 0x082;
use constant CMD_detachSession => 0x030;
use constant CMD_detachedVoidEval => 0x031;
use constant CMD_attachSession => 0x032;
# control commands since 0.6-0
use constant CMD_ctrlEval => 0x42;
use constant CMD_ctrlSource => 0x45;
use constant CMD_ctrlShutdown => 0x44;
# errors as returned by Rserve
use constant ERR_auth_failed => 0x41;
use constant ERR_conn_broken => 0x42;
use constant ERR_inv_cmd => 0x43;
use constant ERR_inv_par => 0x44;
use constant ERR_Rerror => 0x45;
use constant ERR_IOerror => 0x46;
use constant ERR_not_open => 0x47;
use constant ERR_access_denied => 0x48;
use constant ERR_unsupported_cmd => 0x49;
use constant ERR_unknown_cmd => 0x4a;
use constant ERR_data_overflow => 0x4b;
use constant ERR_object_too_big => 0x4c;
use constant ERR_out_of_mem => 0x4d;
use constant ERR_ctrl_closed => 0x4e;
use constant ERR_session_busy => 0x50;
use constant ERR_detach_failed => 0x51;
use Config;
my $_initialized = FALSE; #class variable
# get/setter for class variable
sub initialized(@) {
$_initialized = shift if @_;
return $_initialized;
}
my $_machine_is_bigendian = FALSE;
# get/setter for class variable
sub machine_is_bigendian(;$) {
$_machine_is_bigendian = shift if @_;
return $_machine_is_bigendian;
}
#
# initialization of the library
#
sub init {
Statistics::RserveClient::debug( "init()\n" );
my $self = shift;
if ( initialized() ) {
Statistics::RserveClient::debug( "already initialised...\n" );
return;
}
Statistics::RserveClient::debug( "initing...\n" );
Statistics::RserveClient::debug( "setting byte order...\n" );
if ( $Config{byteorder} eq '87654321' ) {
machine_is_bigendian(TRUE);
}
else {
machine_is_bigendian(FALSE);
}
initialized(TRUE);
Statistics::RserveClient::debug( "set initialized to true...\n" );
return $self;
}
#
# if port is 0 then host is interpreted as unix socket, otherwise host
# is the host to connect to (default is local) and port is the TCP
# port number (6311 is the default)
# public
#sub new($host='127.0.0.1', $port = 6311, $debug = FALSE) {
sub new {
Statistics::RserveClient::debug( "new()\n" );
my $class = shift;
my $self = {
socket => undef,
auth_request => FALSE,
auth_method => undef,
auth_key => undef,
};
bless $self, $class;
my $host = '127.0.0.1';
my $port = 6311;
my $debug = FALSE;
if ( @_ == 3 ) {
Statistics::RserveClient::debug "3 args to Statistics::RserveClient::Connection::new()\n";
( $host, $port, $debug ) = shift;
}
elsif ( @_ == 2 ) {
Statistics::RserveClient::debug "2 args to Statistics::RserveClient::Connection::new()\n";
( $host, $port ) = shift;
}
elsif ( @_ == 1 ) {
Statistics::RserveClient::debug "1 args to Statistics::RserveClient::Connection::new()\n";
$host = shift;
}
else {
die("Bad number of arguments in creating connection\n");
}
Statistics::RserveClient::debug( "host: $host\n" );
Statistics::RserveClient::debug( "port: $port\n" );
my $proto = getprotobyname('tcp');
my $inet_addr;
my $paddr;
Statistics::RserveClient::debug( "class = $class\n" );
if ( !$self->initialized() ) {
Statistics::RserveClient::debug( "calling init from new()\n" );
init();
}
eval {
$inet_addr = inet_aton($host)
or die( Statistics::RserveClient::Exception->new("Can't resolve host $host") );
if ( $port == 0 ) {
socket( *SOCKET, Socket::AF_UNIX, Socket::SOCK_STREAM, 0 );
$self->{socket} = *SOCKET;
}
else {
socket( *SOCKET, Socket::AF_INET, Socket::SOCK_STREAM,
Socket::IPPROTO_TCP );
$paddr = sockaddr_in( $port, $inet_addr );
$self->{socket} = *SOCKET;
}
if ( !$self->{socket} ) {
}
};
if ($@) {
die "Unable to create socket: $@";
}
Statistics::RserveClient::debug( "created socket...\n" );
# socket_set_option($self->{socket}, SOL_TCP, SO_DEBUG,2);
setsockopt( $self->{socket}, Socket::IPPROTO_TCP, Socket::SO_DEBUG, 2 );
$paddr = sockaddr_in( $port, $inet_addr );
eval { connect( $self->{socket}, $paddr ); };
if ($@) {
die "Unable to connect:$@";
}
Statistics::RserveClient::debug( "connected...\n" );
# Rserve server ID string has the form (in quads)
# [00] Rsrv
# [04] xxxx - version string, e.g. 0103
# [08] QAP1
# [12] ... (additional quad attributes; /r/n and - are ignored)
my $buf = '';
eval {
( defined( recv( $self->{socket}, $buf, 32, 0 ) )
&& length($buf) >= 32
&& substr( $buf, 0, 4 ) eq 'Rsrv' )
or die "Invalid response from server: $@";
};
if ($@) {
warn $@;
return;
}
else {
# @TODO: need to be less specific here
my $rv = substr( $buf, 4, 4 );
if ( $rv ne '0103' ) {
die Statistics::RserveClient::Exception->new('Unsupported protocol version.');
}
# Parse attributes. From the Rserve documentation
# "R151" - version of R (here 1.5.1)
# "ARpt" - authorization required (here "pt"=plain text, "uc"=unix crypt)
# connection will be closed if the first packet is not CMD_login.
# If more AR.. methods are specified, then client is free to
# use the one he supports (usually the most secure)
# "K***" - key if encoded authentification is challenged (*** is the key)
# for Unix crypt the first two letters of the key are the salt
# required by the server */
# Grab connection attributes (each is a quad)
for ( my $i = 12; $i < 32; $i += 4 ) {
my $attr = substr( $buf, $i, 4 );
Statistics::RserveClient::debug( "attr = $attr\n" );
if ( $attr eq 'ARpt' ) {
$self->{auth_request} = TRUE;
$self->{auth_method} = 'plain';
}
elsif ( $attr eq 'ARuc' ) {
$self->{auth_request} = TRUE;
$self->{auth_method} = 'crypt';
}
if ( substr( $attr, 0, 1 ) eq 'K' ) {
$self->{auth_key} = substr( $attr, 1, 3 );
}
}
return $self;
}
}
sub DESTROY() {
close_connection();
}
# Evaluate a string as an R code and return result
# @param string $string
# @param int $parser
# @param REXP_List $attr
#public function evalString($string, $parser = self::PARSER_NATIVE, $attr=NULL) {
sub evalString() {
my $self = shift;
my $parser = PARSER_NATIVE;
my %attr = ();
my $string = "";
if ( @_ == 3 ) {
Statistics::RserveClient::debug "3 args to evalString\n";
( $string, $parser, %attr ) = shift;
}
elsif ( @_ == 2 ) {
Statistics::RserveClient::debug "2 args to evalString\n";
( $string, $parser ) = shift;
}
elsif ( @_ == 1 ) {
Statistics::RserveClient::debug "1 arg to evalString\n";
$string = shift;
}
Statistics::RserveClient::debug( "parser = $parser\n" );
Statistics::RserveClient::debug( "attr = %attr\n" );
Statistics::RserveClient::debug( "string = $string\n" );
my %r = $self->command( Statistics::RserveClient::Connection::CMD_eval, $string );
Statistics::RserveClient::debug ( Dumper(%r) );
my $i = 20;
if ( !$r{'is_error'} ) {
my $buf = $r{'contents'};
my @res = undef;
if ($parser == PARSER_NATIVE) {
Statistics::RserveClient::debug "calling parser.parse()\n";
Statistics::RserveClient::debug "buf = $buf\n";
Statistics::RserveClient::debug "i = $i\n";
Statistics::RserveClient::debug "attr = \n";
Statistics::RserveClient::debug " " . Dumper %attr . "\n";
Statistics::RserveClient::debug "\n";
@res = Statistics::RserveClient::Parser::parse( $buf, $i, %attr );
}
elsif ($parser == PARSER_REXP) {
@res = Statistics::RserveClient::Parser::parseREXP( $buf, $i, %attr );
}
elsif ($parser == PARSER_DEBUG) {
@res = Statistics::RserveClient::Parser::parseDebug( $buf, $i, %attr );
}
elsif ($parser == PARSER_NATIVE_WRAPPED) {
my $old = Statistics::RserveClient::Parser->use_array_object();
Statistics::RserveClient::Parser->use_array_object(TRUE);
@res = Statistics::RserveClient::Parser->parse( $buf, $i, %attr );
Statistics::RserveClient::Parser->use_array_object($old);
}
else {
die('Unknown parser');
}
return @res;
}
# TODO: contents and code in exception
#die(new Statistics::RserveClient::Exception('unable to evaluate'));
my @loc = caller(1);
warn("Statistics::RserveClient::Connection: Error while evaluating R query string at line $loc[2] of $loc[1].\n");
}
# Evaluate a query string and save the result to temporary file, returning the filepath.
# @param string $string
# @param string $tempDirectory
# @param int $parser
# @param REXP_List $attr
sub evalStringToFile() {
Statistics::RserveClient::debug ("evalStringToFile\n");
my $self = shift;
my $parser = PARSER_NATIVE;
my %attr = ();
my $string = "";
my $filepath = "";
if ( @_ == 4 ) {
Statistics::RserveClient::debug "4 args to evalStringToFile\n";
$string = $_[0];
$filepath = $_[1];
$parser = $_[2];
%attr = $_[3];
}
elsif ( @_ == 3 ) {
Statistics::RserveClient::debug "3 args to evalStringToFile\n";
$string = $_[0];
$filepath = $_[1];
$parser = $_[2];
}
elsif ( @_ == 2 ) {
Statistics::RserveClient::debug "2 args to evalStringToFile\n";
$string = $_[0];
$filepath = $_[1];
}
else {
Statistics::RserveClient::debug "error - 1 arg to evalStringToFile\n";
warn "Too few arguments to evalStringToFile()\n";
}
Statistics::RserveClient::debug "self = $self\n";
Statistics::RserveClient::debug "string = $string;\n";
Statistics::RserveClient::debug "filepath = $filepath\n";
Statistics::RserveClient::debug "parser = $parser\n";
Statistics::RserveClient::debug "attr = $attr\n";
Statistics::RserveClient::debug "string = $string\n";
@stream = $self->evalString($string, $parser, %attr);
open BINARY, ">:raw", $filepath or die "Couldn't open $filepath: $!\n";
foreach (@stream) { print BINARY $_}
close BINARY;
}
#
# * Close the current connection
#
sub close_connection() {
my $self = shift;
if ( $self->{socket} ) {
return CORE::close($self->{socket});
}
return TRUE;
}
#
# send a command to R
# @param int $command command code
# @param string $v command contents
#
sub command() {
my $self = shift;
my $command = shift;
my $v = shift;
#Statistics::RserveClient::debug "v = $v\n";
#Statistics::RserveClient::debug "make pkt..\n";
my $pkt = Statistics::RserveClient::Funclib::_rserve_make_packet( $command, $v );
# Statistics::RserveClient::debug "pkt = $pkt\n";
eval {
#socket_send($self->{socket}, $pkt, length($pkt), 0);
my $n = send( $self->{socket}, $pkt, 0 );
#Statistics::RserveClient::debug "n = $n\n";
die Statistics::RserveClient::Exception->new("Invalid (short) response from server:$!")
if ( $n == 0 );
};
if ($@) {
warn "Error on " . $self->{socket} . ":" . $@ . "\n";
return FALSE;
}
#Statistics::RserveClient::debug "sent pkt..\n";
# get response
return processResponse($self);
}
sub commandRaw() {
my $self = shift;
my $cmd = shift;
my $v = shift;
my $n = length($v);
# Statistics::RserveClient::debug "cmd: $cmd; string: $v, n=$n\n";
# take next largest muliple of 4 to pad out string length
$n = $n + ( ( $n % 4 ) ? ( 4 - $n % 4 ) : 0 );
# [0] (int) command
# [4] (int) length of the message (bits 0-31)
# [8] (int) offset of the data part
# [12] (int) length of the message (bits 32-63)
my $pkt = pack( "V V V V Z$n",
( $cmd, $n, 0, 0, $v ) );
#Statistics::RserveClient::debug "pkt = $pkt\n";
eval {
#socket_send($self->{socket}, $pkt, length($pkt), 0);
my $n = send( $self->{socket}, $pkt, 0 );
#Statistics::RserveClient::debug "n = $n\n";
if ( $n == 0 ) {
die "Invalid (short) response from server:$! \n";
};
};
if ($@) {
warn "Error: on " . $self->{socket} . ":" . $@ . "\n";
return FALSE;
}
#Statistics::RserveClient::debug "sent pkt..\n";
# get response
return processResponse($self);
}
sub processResponse($) {
my $self = shift;
my $n = 0;
my $buf = "";
eval {
#Statistics::RserveClient::debug "receiving pkt..\n";
( defined( recv( $self->{socket}, $buf, 16, 0 ) )
&& length($buf) >= 16 )
or die Statistics::RserveClient::Exception->new(
'Invalid (short) response from server:');
$n = length($buf);
#Statistics::RserveClient::debug "n = $n\n";
};
if ($@) {
warn $@;
}
# Statistics::RserveClient::debug "got response...$buf\n";
my @b = split "", $buf;
#foreach (@b) {print "[" . ord($_) . "]" }; print "\n";
if ( $n != 16 ) {
return FALSE;
}
my $code = Statistics::RserveClient::Funclib::int32( \@b, 0 );
# Statistics::RserveClient::debug "code = $code\n";
my $len = Statistics::RserveClient::Funclib::int32( \@b, 4 );
# Statistics::RserveClient::debug "len = $len\n";
my $ltg = $len;
while ( $ltg > 0 ) {
#Statistics::RserveClient::debug " ltg = $ltg\n";
#Statistics::RserveClient::debug " getting result..\n";
my $buf2 = "";
eval {
# $n = socket_recv($self->{socket}, $buf2, $ltg, 0);
( defined( recv( $self->{socket}, $buf2, $ltg, 0 ) ) )
or die Statistics::RserveClient::Exception->new(
'error getting result from server:');
$n = length($buf2);
# Statistics::RserveClient::debug " n = $n\n";
};
if ($@) {
warn $@;
}
#Statistics::RserveClient::debug "buf = $buf\n";
#Statistics::RserveClient::debug "len(buf) = ". length($buf) . "\n";
#Statistics::RserveClient::debug "buf2 = $buf2\n";
#Statistics::RserveClient::debug "n = $n\n";
if ( $n > 0 ) {
$buf .= $buf2;
undef($buf2);
$ltg -= $n;
}
else {
last;
}
}
# Statistics::RserveClient::debug "code = $code\n";
# Statistics::RserveClient::debug "code & 15 = " . ($code & 15) . "\n";
# Statistics::RserveClient::debug "code error = " . (($code >> 24) & 127) . "\n";
#Statistics::RserveClient::debug "buf = $buf\n";
#foreach (split "", $buf) {print "[" . ord($_)."]"};
#Statistics::RserveClient::debug "\n";
my %r = (
code => $code,
is_error => ( ( $code & 15 ) != 1 ) ? TRUE : FALSE,
'error' => ( $code >> 24 ) & 127,
'contents' => $buf
);
return (%r);
}
#
# Assign a value to a symbol in R
# @param string $symbol name of the variable to set (should be compliant with R syntax !)
# @param Statistics::RserveClient::REXP $value value to set
sub assign($$$) {
my $self = shift;
my $symbol = shift;
my $value = shift;
unless ($symbol->isa('Statistics::RserveClient::REXP::Symbol') ||
$symbol->isa('Statistics::RserveClient::REXP::String')) {
$symbol = '' . $symbol;
my $s = Statistics::RserveClient::REXP::Symbol->new($symbol);
$symbol = $s;
}
unless ($value->isa('Statistics::RserveClient::REXP')) {
die Statistics::RserveClient::Exception->new("value should be REXP object");
}
my $n = length($symbol->getValue());
my $data = join('', Statistics::RserveClient::Parser::createBinary($value));
my $debug_msg = "";
foreach ( split '', $data ) { $debug_msg .= "[" . ord($_) . "]"};
$debug_msg .= "\n";
Statistics::RserveClient::debug $debug_msg;
my $contents = '' .
Statistics::RserveClient::Funclib::mkint8(DT_STRING) .
Statistics::RserveClient::Funclib::mkint24($n+1) . $symbol->getValue() . chr(0) .
Statistics::RserveClient::Funclib::mkint8(DT_SEXP) .
join('', Statistics::RserveClient::Funclib::mkint24(length($data))) . $data;
$debug_msg = "";
foreach (split "", $contents) { $debug_msg .= "[" . ord($_) . "]"};
$debug_msg .= "\n";
Statistics::RserveClient::debug $debug_msg;
my %r = $self->commandRaw(Statistics::RserveClient::Connection::CMD_assignSEXP, $contents);
die if $r{'is_error'};
}
1;