/* vim: set ft=c */
/*
Copyright (C) 2011 Dmitry E. Oboukhov <unera@debian.org>
Copyright (C) 2011 Roman V. Nikolaev <rshadow@rambler.ru>
This program is free software, you can redistribute it and/or
modify it under the terms of the Artistic License.
*/
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "tp.h"
#define PREALLOC_SCALAR_SIZE 0
inline static void hash_ssave(HV *h, const char *k, const char *v) {
hv_store( h, k, strlen(k), newSVpvn( v, strlen(v) ), 0 );
}
inline static void hash_isave(HV *h, const char *k, uint32_t v) {
hv_store( h, k, strlen(k), newSViv( v ), 0 );
}
static char * sv_resizer(struct tp *p, size_t req, size_t *size) {
SV *sv = p->obj;
STRLEN new_len = tp_size(p) + req;
char *new_str = SvGROW(sv, new_len);
if (!new_str)
croak("Cannot allocate memory");
// SvCUR_set(sv, new_len);
*size = new_len;
return new_str;
}
inline static void tp_av_tuple(struct tp *req, AV *tuple) {
int i;
tp_tuple(req);
for (i = 0; i <= av_len(tuple); i++) {
SV *field = *av_fetch(tuple, i, 0);
char *fd;
STRLEN fl;
fd = SvPV(field, fl);
tp_field(req, fd, fl);
}
}
inline static void fetch_tuples( HV * ret, struct tp * rep ) {
AV * tuples = newAV();
hv_store( ret, "tuples", 6, newRV_noinc( ( SV * ) tuples ), 0 );
while ( tp_next(rep) ) {
AV * t = newAV();
av_push( tuples, newRV_noinc( ( SV * ) t ) );
while(tp_nextfield(rep)) {
SV * f = newSVpvn( tp_getfield(rep), tp_getfieldsize(rep) );
av_push( t, f );
}
}
}
#define ALLOC_RET_SV(__name, __ptr, __len, __size) \
SV *__name = newSVpvn("", 0); \
RETVAL = __name; \
if (__size) SvGROW(__name, __size); \
STRLEN __len; \
char *__ptr = SvPV(__name, __len);
MODULE = DR::Tarantool PACKAGE = DR::Tarantool
PROTOTYPES: ENABLE
SV * _pkt_select( req_id, ns, idx, offset, limit, keys )
unsigned req_id
unsigned ns
unsigned idx
unsigned offset
unsigned limit
AV * keys
CODE:
ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
int k;
struct tp req;
tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
tp_select(&req, ns, idx, offset, limit);
for (k = 0; k <= av_len(keys); k++) {
SV *t = *av_fetch(keys, k, 0);
if (!SvROK(t) || (SvTYPE(SvRV(t)) != SVt_PVAV))
croak("keys must be ARRAYREF of ARRAYREF");
AV *tuple = (AV *)SvRV(t);
tp_av_tuple(&req, (AV *)SvRV(t));
}
tp_reqid(&req, req_id);
SvCUR_set(ret, tp_used(&req));
OUTPUT:
RETVAL
SV * _pkt_ping( req_id )
unsigned req_id
CODE:
ALLOC_RET_SV(ret, b, len, 0);
struct tp req;
tp_init(&req, b, len, sv_resizer, ret);
tp_ping(&req);
tp_reqid(&req, req_id);
SvCUR_set(ret, tp_used(&req));
OUTPUT:
RETVAL
SV * _pkt_insert( req_id, ns, flags, tuple )
unsigned req_id
unsigned ns
unsigned flags
AV * tuple
CODE:
ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
struct tp req;
tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
tp_insert(&req, ns, flags);
tp_av_tuple(&req, tuple);
tp_reqid(&req, req_id);
SvCUR_set(ret, tp_used(&req));
OUTPUT:
RETVAL
SV * _pkt_delete( req_id, ns, flags, tuple )
unsigned req_id
unsigned ns
unsigned flags
AV *tuple
CODE:
ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
struct tp req;
tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
tp_delete(&req, ns, flags);
tp_av_tuple(&req, tuple);
tp_reqid(&req, req_id);
SvCUR_set(ret, tp_used(&req));
OUTPUT:
RETVAL
SV * _pkt_call_lua( req_id, flags, proc, tuple )
unsigned req_id
unsigned flags
SV *proc
AV *tuple
CODE:
STRLEN name_len;
char *name = SvPV(proc, name_len);
ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
struct tp req;
tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
tp_call(&req, flags, name, name_len);
tp_av_tuple(&req, tuple);
tp_reqid(&req, req_id);
SvCUR_set(ret, tp_used(&req));
OUTPUT:
RETVAL
SV * _pkt_update( req_id, ns, flags, tuple, operations )
unsigned req_id
unsigned ns
unsigned flags
AV *tuple
AV *operations
CODE:
ALLOC_RET_SV(ret, b, len, PREALLOC_SCALAR_SIZE);
struct tp req;
int i;
tp_init(&req, b, PREALLOC_SCALAR_SIZE, sv_resizer, ret);
tp_update(&req, ns, flags);
tp_reqid(&req, req_id);
tp_av_tuple(&req, tuple);
tp_updatebegin(&req);
for (i = 0; i <= av_len( operations ); i++) {
uint8_t opcode;
SV *op = *av_fetch( operations, i, 0 );
if (!SvROK(op) || SvTYPE( SvRV(op) ) != SVt_PVAV)
croak("Wrong update operation format");
AV *aop = (AV *)SvRV(op);
int asize = av_len( aop ) + 1;
if ( asize < 2 )
croak("Too short operation argument list");
unsigned fno = SvIV( *av_fetch( aop, 0, 0 ) );
STRLEN size;
char *opname = SvPV( *av_fetch( aop, 1, 0 ), size );
/* delete */
if ( strcmp(opname, "delete") == 0 ) {
tp_op(&req, fno, TP_OPDELETE, "", 0);
continue;
}
if (asize < 3)
croak("Too short operation argument list");
/* assign */
if ( strcmp(opname, "set") == 0 ) {
char *data =
SvPV( *av_fetch( aop, 2, 0 ), size );
tp_op(&req, fno, TP_OPSET, data, size);
continue;
}
/* insert */
if ( strcmp(opname, "insert") == 0 ) {
char *data =
SvPV( *av_fetch( aop, 2, 0 ), size );
tp_op(&req, fno, TP_OPINSERT, data, size);
continue;
}
/* arithmetic operations */
if ( strcmp(opname, "add") == 0 ) {
opcode = TP_OPADD;
goto ARITH;
}
if ( strcmp(opname, "and") == 0 ) {
opcode = TP_OPAND;
goto ARITH;
}
if ( strcmp(opname, "or") == 0 ) {
opcode = TP_OPOR;
goto ARITH;
}
if ( strcmp(opname, "xor") == 0 ) {
opcode = TP_OPXOR;
goto ARITH;
}
/* substr */
if ( strcmp(opname, "substr") == 0 ) {
if (asize < 4)
croak("Too short argument "
"list for substr");
unsigned offset =
SvIV( *av_fetch( aop, 2, 0 ) );
unsigned length =
SvIV( *av_fetch( aop, 3, 0 ) );
char * data;
if ( asize > 4 && SvOK( *av_fetch( aop, 4, 0 ) ) ) {
data =
SvPV( *av_fetch( aop, 4, 0 ), size );
} else {
data = "";
size = 0;
}
tp_opsplice(&req, fno, offset, length,
data, size);
continue;
}
/* unknown command */
croak("unknown update operation: `%s'", opname);
ARITH: {
char *data =
SvPV( *av_fetch( aop, 2, 0 ), size );
if (sizeof(unsigned long long) < size)
size = sizeof(unsigned long long);
tp_op(&req, fno, opcode, data, size);
continue;
}
}
SvCUR_set(ret, tp_used(&req));
OUTPUT:
RETVAL
HV * _pkt_parse_response( response )
SV *response
INIT:
RETVAL = newHV();
sv_2mortal((SV *)RETVAL);
CODE:
/* asm("break"); */
if ( !SvOK(response) )
croak( "response is undefined" );
STRLEN size;
char *data = SvPV( response, size );
struct tp rep;
tp_init(&rep, data, size, NULL, 0);
// tp_use(&rep, size);
ssize_t code = tp_reply(&rep);
if (code == -1) {
hash_ssave(RETVAL, "status", "buffer");
hash_ssave(RETVAL, "errstr", "Input data too short");
// hash_ssave(RETVAL, "status", "fatal");
// hash_ssave(RETVAL, "errstr",
// "Can't parse server response");
} else if (code >= 0) {
uint32_t type = tp_replyop(&rep);
hash_isave(RETVAL, "code", tp_replycode(&rep) );
hash_isave(RETVAL, "req_id", tp_getreqid(&rep) );
hash_isave(RETVAL, "type", type );
hash_isave(RETVAL, "count", tp_replycount(&rep) );
if (code == 0) {
if (type != TP_PING)
fetch_tuples(RETVAL, &rep);
hash_ssave(RETVAL, "status", "ok");
} else {
hash_ssave(RETVAL, "status", "error");
size_t el = tp_replyerrorlen(&rep);
SV *err;
if (el) {
char *s = tp_replyerror(&rep);
if (s[el - 1] == 0)
el--;
err = newSVpvn(s, el);
} else {
err = newSVpvn("", 0);
}
hv_stores(RETVAL, "errstr", err);
}
}
OUTPUT:
RETVAL
unsigned TNT_PING()
CODE:
RETVAL = TP_PING;
OUTPUT:
RETVAL
unsigned TNT_CALL()
CODE:
RETVAL = TP_CALL;
OUTPUT:
RETVAL
unsigned TNT_INSERT()
CODE:
RETVAL = TP_INSERT;
OUTPUT:
RETVAL
unsigned TNT_UPDATE()
CODE:
RETVAL = TP_UPDATE;
OUTPUT:
RETVAL
unsigned TNT_DELETE()
CODE:
RETVAL = TP_DELETE;
OUTPUT:
RETVAL
unsigned TNT_SELECT()
CODE:
RETVAL = TP_SELECT;
OUTPUT:
RETVAL
unsigned TNT_FLAG_RETURN()
CODE:
RETVAL = TP_FRET;
OUTPUT:
RETVAL
unsigned TNT_FLAG_ADD()
CODE:
RETVAL = TP_FADD;
OUTPUT:
RETVAL
unsigned TNT_FLAG_REPLACE()
CODE:
RETVAL = TP_FREP;
OUTPUT:
RETVAL
unsigned TNT_FLAG_BOX_QUIET()
CODE:
RETVAL = TP_FQUIET;
OUTPUT:
RETVAL