The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* vim: set ft=c */
/*

 Copyright (C) 2011 Dmitry E. Oboukhov <unera@debian.org>
 Copyright (C) 2011 Roman V. Nikolaev <rshadow@rambler.ru>

 This program is free software, you can redistribute it and/or
 modify it under the terms of the Artistic License.

*/
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include <tarantool/tnt.h>
#include <string.h>


static struct tnt_tuple* tmake_tuple( AV *t ) {
	int i;

	struct tnt_tuple *r = tnt_mem_alloc( sizeof( struct tnt_tuple ) );
	if ( !r )
		croak("Can not allocate memory");
	tnt_tuple_init( r );
	r->alloc = 1;

	for (i = 0; i <= av_len( t ); i++) {
		STRLEN size;
		char *data = SvPV( *av_fetch( t, i, 0 ), size );
		tnt_tuple_add( r, data, size );
	}
	return r;
}

static struct tnt_stream * tmake_buf(void) {
	struct tnt_stream *b = tnt_buf( NULL );
	if ( !b )
		croak("Can not allocate memory");

	return b;
}


static struct tnt_stream *tmake_oplist( AV *ops ) {
	int i;
	struct tnt_stream *b = tmake_buf();

	for (i = 0; i <= av_len( ops ); i++) {
		uint8_t opcode;

		SV *op = *av_fetch( ops, i, 0 );
		if (!SvROK(op) || SvTYPE( SvRV(op) ) != SVt_PVAV)
			croak("Wrong update operation format");
		AV *aop = (AV *)SvRV(op);

		int asize = av_len( aop ) + 1;
		if ( asize < 2 )
			croak("Too short operation argument list");

		unsigned fno = SvIV( *av_fetch( aop, 0, 0 ) );
		STRLEN size;
		char *opname = SvPV( *av_fetch( aop, 1, 0 ), size );


		/* delete */
		if ( strcmp(opname, "delete") == 0 ) {
			tnt_update_delete( b, fno );
			continue;
		}


		if (asize < 3)
			croak("Too short operation argument list");

		/* assign */
		if ( strcmp(opname, "set") == 0 ) {

			char *data = SvPV( *av_fetch( aop, 2, 0 ), size );
			tnt_update_assign( b, fno, data, size );
			continue;
		}

		/* insert */
		if ( strcmp(opname, "insert") == 0 ) {
			char *data = SvPV( *av_fetch( aop, 2, 0 ), size );
			tnt_update_insert( b, fno, data, size );
			continue;
		}


		/* arithmetic operations */
		if ( strcmp(opname, "add") == 0 ) {
			opcode = TNT_UPDATE_ADD;
			goto ARITH;
		}
		if ( strcmp(opname, "and") == 0 ) {
			opcode = TNT_UPDATE_AND;
			goto ARITH;
		}
		if ( strcmp(opname, "or") == 0 ) {
			opcode = TNT_UPDATE_OR;
			goto ARITH;
		}
		if ( strcmp(opname, "xor") == 0 ) {
			opcode = TNT_UPDATE_XOR;
			goto ARITH;
		}


		/* substr */
		if ( strcmp(opname, "substr") == 0 ) {
			if (asize < 4)
				croak("Too short argument list for substr");
			unsigned offset = SvIV( *av_fetch( aop, 2, 0 ) );
			unsigned length = SvIV( *av_fetch( aop, 3, 0 ) );
			char * data;
			if ( asize > 4 && SvOK( *av_fetch( aop, 4, 0 ) ) ) {
			    data = SvPV( *av_fetch( aop, 4, 0 ), size );
			} else {
			    data = "";
			    size = 0;
                        }
			tnt_update_splice( b, fno, offset, length, data, size );
			continue;
		}

		{ /* unknown command */
			char err[512];
			snprintf(err, 512,
				"unknown update operation: `%s'",
				opname
			);
			croak(err);
		}

		ARITH: {
			tnt_update_arith(
				b, fno, opcode, SvIV( *av_fetch( aop, 2, 0 ) )
			);
			continue;
		}

	}

	return b;

}

static void hash_ssave(HV *h, const char *k, const char *v) {
	hv_store( h, k, strlen(k), newSVpvn( v, strlen(v) ), 0 );
}

static void hash_isave(HV *h, const char *k, uint32_t v) {
	hv_store( h, k, strlen(k), newSViv( v ), 0 );
}

static AV * extract_tuples(struct tnt_reply *r) {
	struct tnt_iter it;
	tnt_iter_list(&it, TNT_REPLY_LIST(r));
	AV *res = newAV();
	while (tnt_next(&it)) {
		struct tnt_iter ifl;
		struct tnt_tuple *tu = TNT_ILIST_TUPLE(&it);
		tnt_iter(&ifl, tu);
		AV *t = newAV();
		while (tnt_next(&ifl)) {
			char *data = TNT_IFIELD_DATA(&ifl);
			uint32_t size = TNT_IFIELD_SIZE(&ifl);
			av_push(t, newSVpvn(data, size));
		}
		av_push(res, newRV_noinc((SV *) t));
	}
	return res;
}

MODULE = DR::Tarantool		PACKAGE = DR::Tarantool
PROTOTYPES: ENABLE

SV * _pkt_select( req_id, ns, idx, offset, limit, keys )
	unsigned req_id
	unsigned ns
	unsigned idx
	unsigned offset
	unsigned limit
	AV * keys


	CODE:
		int i;
		struct tnt_list list;
		tnt_list_init( &list );

		if ( ( list.count = av_len ( keys ) + 1 ) ) {
			list.list = tnt_mem_alloc(
				sizeof( struct tnt_list_ptr ) * list.count
			);


			if ( !list.list )
				return;

			for (i = 0; i < list.count; i++) {
				SV *t = *av_fetch( keys, i, 0 );
				if (!SvROK(t) || (SvTYPE(SvRV(t)) != SVt_PVAV))
					croak("keys must be ARRAYREF"
						" of ARRAYREF"
					);

				list.list[i].ptr = tmake_tuple( (AV *)SvRV(t) );
			}
		}

		struct tnt_stream *s = tmake_buf();
		tnt_stream_reqid( s, req_id );
		tnt_select( s, ns, idx, offset, limit, &list );
		tnt_list_free( &list );


		RETVAL = newSVpvn( TNT_SBUF_DATA(s), TNT_SBUF_SIZE(s) );
		tnt_stream_free( s );

	OUTPUT:
		RETVAL


SV * _pkt_ping( req_id )
	unsigned req_id

	CODE:
		struct tnt_stream *s = tmake_buf();
		tnt_stream_reqid( s, req_id );
		tnt_ping( s );
		RETVAL = newSVpvn( TNT_SBUF_DATA(s), TNT_SBUF_SIZE(s) );
		tnt_stream_free( s );
	OUTPUT:
		RETVAL

SV * _pkt_insert( req_id, ns, flags, tuple )
	unsigned req_id
	unsigned ns
	unsigned flags
	AV * tuple

	CODE:
		struct tnt_tuple *t = tmake_tuple( tuple );
		struct tnt_stream *s = tmake_buf();
		tnt_stream_reqid( s, req_id );
		tnt_insert( s, ns, flags, t );
		tnt_tuple_free( t );
		RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) );
		tnt_stream_free( s );

	OUTPUT:
		RETVAL

SV * _pkt_update( req_id, ns, flags, tuple, operations )
	unsigned req_id
	unsigned ns
	unsigned flags
	AV *tuple
	AV *operations

	CODE:
		struct tnt_tuple *t = tmake_tuple( tuple );
		struct tnt_stream *s = tmake_buf();
		struct tnt_stream *ops = tmake_oplist( operations );

		tnt_stream_reqid( s, req_id );
		tnt_update( s, ns, flags, t, ops );
		tnt_tuple_free( t );

		RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) );

		tnt_stream_free( ops );
		tnt_stream_free( s );


	OUTPUT:
		RETVAL

SV * _pkt_delete( req_id, ns, flags, tuple )
	unsigned req_id
	unsigned ns
	unsigned flags
	AV *tuple

	CODE:
		struct tnt_tuple *t = tmake_tuple( tuple );
		struct tnt_stream *s = tmake_buf();
		tnt_stream_reqid( s, req_id );
		tnt_delete( s, ns, flags, t );
		tnt_tuple_free( t );
		RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) );
		tnt_stream_free( s );
	OUTPUT:
		RETVAL

SV * _pkt_call_lua( req_id, flags, proc, tuple )
	unsigned req_id
	unsigned flags
	char *proc
	AV *tuple

	CODE:
		struct tnt_tuple *t = tmake_tuple( tuple );
		struct tnt_stream *s = tmake_buf();
		tnt_stream_reqid( s, req_id );
		tnt_call( s, flags, proc, t );
		tnt_tuple_free( t );
		RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) );
		tnt_stream_free( s );
	OUTPUT:
		RETVAL



HV * _pkt_parse_response( response )
	SV *response

	CODE:
		if ( !SvOK(response) )
			croak( "response is undefined" );
		STRLEN size;
		char *data = SvPV( response, size );
		struct tnt_reply reply;
		tnt_reply_init( &reply );
		size_t offset = 0;
		int cnt = tnt_reply( &reply, data, size, &offset );
		int i, j;

		HV *res = newHV();
		if ( cnt < 0 ) {
			hash_ssave(res, "status", "fatal");
			hash_ssave(res,
			    "errstr", "Can't parse server response");
		} else if ( cnt > 0 ) {
			hash_ssave(res, "status", "buffer");
			hash_ssave(res, "errstr", "Input data too short");
		} else {
			hash_isave(res, "code", reply.code );
			hash_isave(res, "req_id", reply.reqid );
        		hash_isave(res, "type", reply.op );
        		hash_isave(res, "count", reply.count);
        		if (reply.code) {
        		    hash_ssave( res, "errstr", reply.error );
			    hash_ssave(res, "status", "error");

                        } else {
			    hash_ssave(res, "status", "ok");
                            AV *tuples = extract_tuples( &reply );
                            hv_store(
                                res, "tuples", 6, newRV_noinc((SV *)tuples), 0
                            );
                        }
		}
                sv_2mortal( (SV *) res );
		RETVAL = res;
		tnt_reply_free( &reply );

	OUTPUT:
		RETVAL


unsigned _op_insert()
	CODE:
		RETVAL = TNT_OP_INSERT;
	OUTPUT:
		RETVAL

unsigned _op_select()
	CODE:
		RETVAL = TNT_OP_SELECT;
	OUTPUT:
		RETVAL

unsigned _op_update()
	CODE:
		RETVAL = TNT_OP_UPDATE;
	OUTPUT:
		RETVAL

unsigned _op_delete()
	CODE:
		RETVAL = TNT_OP_DELETE;
	OUTPUT:
		RETVAL

unsigned _op_call()
	CODE:
		RETVAL = TNT_OP_CALL;
	OUTPUT:
		RETVAL

unsigned _op_ping()
	CODE:
		RETVAL = TNT_OP_PING;
	OUTPUT:
		RETVAL


unsigned _flag_return()
	CODE:
		RETVAL = TNT_FLAG_RETURN;
	OUTPUT:
		RETVAL

unsigned _flag_add()
	CODE:
		RETVAL = TNT_FLAG_ADD;
	OUTPUT:
		RETVAL

unsigned _flag_replace()
	CODE:
		RETVAL = TNT_FLAG_REPLACE;
	OUTPUT:
		RETVAL

unsigned _flag_box_quiet()
	CODE:
		RETVAL = TNT_FLAG_BOX_QUIET;
	OUTPUT:
		RETVAL

unsigned _flag_not_store()
	CODE:
		RETVAL = TNT_FLAG_NOT_STORE;
	OUTPUT:
		RETVAL