#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"
#include "hiredis.h"
#include "async.h"
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <stdio.h>
#include <sys/select.h>
#include <sys/socket.h>
#define MAX_ERROR_SIZE 256
#define WAIT_FOR_EVENT_OK 0
#define WAIT_FOR_EVENT_READ_TIMEOUT 1
#define WAIT_FOR_EVENT_WRITE_TIMEOUT 2
#define WAIT_FOR_EVENT_EXCEPTION 3
//#define DEBUG
#if defined(DEBUG)
#define DEBUG_MSG(fmt, ...) \
do { \
fprintf(stderr, "[%s:%d:%s]: ", __FILE__, __LINE__, __func__); \
fprintf(stderr, fmt, __VA_ARGS__); \
fprintf(stderr, "\n"); \
} while(0)
#else
#define DEBUG_MSG(fmt, ...)
#endif
#define EQUALS_COMMAND(len, cmd, expected) ((len) == sizeof(expected) - 1 && memcmp(cmd, expected, sizeof(expected) - 1) == 0)
typedef struct redis_fast_s {
redisAsyncContext* ac;
char* hostname;
int port;
char* path;
char* error;
int reconnect;
int every;
double cnx_timeout;
double read_timeout;
double write_timeout;
int current_database;
int need_recoonect;
int is_connected;
SV* on_connect;
SV* on_build_sock;
SV* data;
int proccess_sub_count;
int is_subscriber;
int expected_subs;
pid_t pid;
enum {
FLAG_INSIDE_TRANSACTION = 0x01,
FLAG_INSIDE_WATCH = 0x02,
} flags;
} redis_fast_t, *Redis__Fast;
typedef struct redis_fast_reply_s {
SV* result;
SV* error;
} redis_fast_reply_t;
typedef redis_fast_reply_t (*CUSTOM_DECODE)(Redis__Fast self, redisReply* reply, int collect_errors);
typedef struct redis_fast_sync_cb_s {
redis_fast_reply_t ret;
int collect_errors;
CUSTOM_DECODE custom_decode;
int on_flags;
int off_flags;
} redis_fast_sync_cb_t;
typedef struct redis_fast_async_cb_s {
SV* cb;
int collect_errors;
CUSTOM_DECODE custom_decode;
int on_flags;
int off_flags;
} redis_fast_async_cb_t;
typedef struct redis_fast_subscribe_cb_s {
Redis__Fast self;
SV* cb;
} redis_fast_subscribe_cb_t;
#define WAIT_FOR_READ 0x01
#define WAIT_FOR_WRITE 0x02
typedef struct redis_fast_event_s {
int flags;
} redis_fast_event_t;
static void AddRead(void *privdata) {
redis_fast_event_t *e = (redis_fast_event_t*)privdata;
e->flags |= WAIT_FOR_READ;
DEBUG_MSG("flags = %x", e->flags);
}
static void DelRead(void *privdata) {
redis_fast_event_t *e = (redis_fast_event_t*)privdata;
e->flags &= ~WAIT_FOR_READ;
DEBUG_MSG("flags = %x", e->flags);
}
static void AddWrite(void *privdata) {
redis_fast_event_t *e = (redis_fast_event_t*)privdata;
e->flags |= WAIT_FOR_WRITE;
DEBUG_MSG("flags = %x", e->flags);
}
static void DelWrite(void *privdata) {
redis_fast_event_t *e = (redis_fast_event_t*)privdata;
e->flags &= ~WAIT_FOR_WRITE;
DEBUG_MSG("flags = %x", e->flags);
}
static void Cleanup(void *privdata) {
free(privdata);
}
static int Attach(redisAsyncContext *ac) {
redis_fast_event_t *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
e = (redis_fast_event_t*)malloc(sizeof(*e));
e->flags = 0;
/* Register functions to start/stop listening for events */
ac->ev.addRead = AddRead;
ac->ev.delRead = DelRead;
ac->ev.addWrite = AddWrite;
ac->ev.delWrite = DelWrite;
ac->ev.cleanup = Cleanup;
ac->ev.data = e;
return REDIS_OK;
}
static int wait_for_event(Redis__Fast self, double read_timeout, double write_timeout) {
redisContext *c;
int fd;
redis_fast_event_t *e;
fd_set readfds, writefds, exceptfds;
struct timeval t;
int rc;
double timeout = -1;
int timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
c = &(self->ac->c);
fd = c->fd;
e = (redis_fast_event_t*)self->ac->ev.data;
if(e==NULL) return 0;
if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
read_timeout, write_timeout);
if(read_timeout < 0 && write_timeout < 0) {
timeout = -1;
timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
} else if(read_timeout < 0) {
timeout = write_timeout;
timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
} else if(write_timeout < 0) {
timeout = read_timeout;
timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
} else if(read_timeout < write_timeout) {
timeout = read_timeout;
timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
} else {
timeout = write_timeout;
timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
}
} else if(e->flags & WAIT_FOR_READ) {
DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
timeout = read_timeout;
timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
} else if(e->flags & WAIT_FOR_WRITE) {
DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
timeout = write_timeout;
timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
}
START_SELECT:
t.tv_sec = (int)timeout;
t.tv_usec = (timeout - (int)timeout) * 1000000;
DEBUG_MSG("select start, timeout is %f", timeout);
FD_ZERO(&readfds); if(e->flags & WAIT_FOR_READ) { FD_SET(fd, &readfds); }
FD_ZERO(&writefds); if(e->flags & WAIT_FOR_WRITE) { FD_SET(fd, &writefds); }
FD_ZERO(&exceptfds); FD_SET(fd, &exceptfds);
rc = select(fd + 1, &readfds, &writefds, &exceptfds, timeout < 0 ? NULL : &t);
DEBUG_MSG("select returns %d", rc);
if(rc == 0) {
DEBUG_MSG("%s", "timeout");
return timeout_mode;
}
if(rc < 0 || FD_ISSET(fd, &exceptfds)) {
DEBUG_MSG("%s", "exception!!");
if( errno == EINTR ) {
PERL_ASYNC_CHECK();
DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
goto START_SELECT;
}
return WAIT_FOR_EVENT_EXCEPTION;
}
if(self->ac && FD_ISSET(fd, &readfds)) {
DEBUG_MSG("ready to %s", "read");
redisAsyncHandleRead(self->ac);
}
if(self->ac && FD_ISSET(fd, &writefds)) {
DEBUG_MSG("ready to %s", "write");
redisAsyncHandleWrite(self->ac);
}
DEBUG_MSG("%s", "finish");
return WAIT_FOR_EVENT_OK;
}
static void Redis__Fast_connect_cb(redisAsyncContext* c, int status) {
Redis__Fast self = (Redis__Fast)c->data;
DEBUG_MSG("connected status = %d", status);
if(status != REDIS_OK) {
// Connection Error!!
// Redis context will close automatically
self->ac = NULL;
} else {
self->is_connected = 1;
}
}
static void Redis__Fast_disconnect_cb(redisAsyncContext* c, int status) {
Redis__Fast self = (Redis__Fast)c->data;
PERL_UNUSED_VAR(status);
DEBUG_MSG("disconnected status = %d", status);
self->ac = NULL;
}
static redisAsyncContext* __build_sock(Redis__Fast self)
{
redisAsyncContext *ac;
double timeout;
int res;
DEBUG_MSG("%s", "start");
if(self->on_build_sock) {
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
call_sv(self->on_build_sock, G_DISCARD | G_NOARGS);
FREETMPS;
LEAVE;
}
if(self->path) {
ac = redisAsyncConnectUnix(self->path);
} else {
ac = redisAsyncConnect(self->hostname, self->port);
}
if(ac == NULL) {
DEBUG_MSG("%s", "allocation error");
return NULL;
}
if(ac->err) {
DEBUG_MSG("connection error: %s", ac->errstr);
redisAsyncFree(ac);
return NULL;
}
ac->data = (void*)self;
self->ac = ac;
self->is_connected = 0;
Attach(ac);
redisAsyncSetConnectCallback(ac, (redisConnectCallback*)Redis__Fast_connect_cb);
redisAsyncSetDisconnectCallback(ac, (redisDisconnectCallback*)Redis__Fast_disconnect_cb);
// wait to connect...
timeout = -1;
if(self->cnx_timeout) {
timeout = self->cnx_timeout;
}
while(!self->is_connected) {
res = wait_for_event(self, timeout, timeout);
if(self->ac == NULL) {
return NULL;
}
if(res != WAIT_FOR_EVENT_OK) {
DEBUG_MSG("error: %d", res);
redisAsyncFree(self->ac);
self->ac = NULL;
return NULL;
}
}
if(self->on_connect){
dSP;
PUSHMARK(SP);
call_sv(self->on_connect, G_DISCARD | G_NOARGS);
}
DEBUG_MSG("%s", "finsih");
return self->ac;
}
static int _wait_all_responses(Redis__Fast self) {
DEBUG_MSG("%s", "start");
while(self->ac && self->ac->replies.tail) {
int res = wait_for_event(self, self->read_timeout, self->write_timeout);
if (res != WAIT_FOR_EVENT_OK) {
DEBUG_MSG("error: %d", res);
return res;
}
}
DEBUG_MSG("%s", "finish");
return WAIT_FOR_EVENT_OK;
}
static void Redis__Fast_connect(Redis__Fast self) {
struct timeval start, end;
DEBUG_MSG("%s", "start");
if (self->ac) {
redisAsyncFree(self->ac);
self->ac = NULL;
}
self->flags = 0;
//$self->{queue} = [];
self->pid = getpid();
if(self->reconnect == 0) {
__build_sock(self);
if(!self->ac) {
if(self->path) {
snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
} else {
snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
}
croak("%s", self->error);
}
return ;
}
// Reconnect...
gettimeofday(&start, NULL);
while (1) {
double elapsed_time;
if(__build_sock(self)) {
// Connected!
DEBUG_MSG("%s", "finish");
return;
}
gettimeofday(&end, NULL);
elapsed_time = (end.tv_sec-start.tv_sec) + 1E-6 * (end.tv_usec-start.tv_usec);
DEBUG_MSG("elasped time:%f, reconnect:%d", elapsed_time, self->reconnect);
if( elapsed_time > self->reconnect) {
if(self->path) {
snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
} else {
snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
}
DEBUG_MSG("%s", "timed out");
croak("%s", self->error);
return;
}
DEBUG_MSG("%s", "failed to connect. wait...");
usleep(self->every);
}
DEBUG_MSG("%s", "finish");
}
static void Redis__Fast_reconnect(Redis__Fast self) {
DEBUG_MSG("%s", "start");
if(!self->ac && self->reconnect) {
DEBUG_MSG("%s", "connection not found. reconnect");
Redis__Fast_connect(self);
}
if(!self->ac) {
DEBUG_MSG("%s", "Not connected to any server");
}
DEBUG_MSG("%s", "finish");
}
static redis_fast_reply_t Redis__Fast_decode_reply(Redis__Fast self, redisReply* reply, int collect_errors) {
redis_fast_reply_t res = {NULL, NULL};
switch (reply->type) {
case REDIS_REPLY_ERROR:
res.error = sv_2mortal(newSVpvn(reply->str, reply->len));
break;
case REDIS_REPLY_STRING:
case REDIS_REPLY_STATUS:
res.result = sv_2mortal(newSVpvn(reply->str, reply->len));
break;
case REDIS_REPLY_INTEGER:
res.result = sv_2mortal(newSViv(reply->integer));
break;
case REDIS_REPLY_NIL:
res.result = sv_2mortal(newSV(0));
break;
case REDIS_REPLY_ARRAY: {
AV* av = newAV();
size_t i;
res.result = sv_2mortal(newRV_noinc((SV*)av));
for (i = 0; i < reply->elements; i++) {
redis_fast_reply_t elem = Redis__Fast_decode_reply(self, reply->element[i], collect_errors);
if(collect_errors) {
AV* elem_av = (AV*)sv_2mortal((SV*)newAV());
if(elem.result) {
av_push(elem_av, SvREFCNT_inc(elem.result));
} else {
av_push(elem_av, newSV(0));
}
if(elem.error) {
av_push(elem_av, SvREFCNT_inc(elem.error));
} else {
av_push(elem_av, newSV(0));
}
av_push(av, newRV_inc((SV*)elem_av));
} else {
if(elem.result) {
av_push(av, SvREFCNT_inc(elem.result));
} else {
av_push(av, newSV(0));
}
if(elem.error && !res.error) {
res.error = elem.error;
}
}
}
break;
}
}
return res;
}
static void Redis__Fast_sync_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
Redis__Fast self = (Redis__Fast)c->data;
redis_fast_sync_cb_t *cbt = (redis_fast_sync_cb_t*)privdata;
DEBUG_MSG("%p", (void*)privdata);
if(reply) {
self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
if(cbt->custom_decode) {
cbt->ret = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
} else {
cbt->ret = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
}
} else if(c->c.flags & REDIS_FREEING) {
DEBUG_MSG("%s", "redis feeing");
Safefree(cbt);
} else {
DEBUG_MSG("connect error: %s", c->errstr);
self->need_recoonect = 1;
cbt->ret.result = NULL;
cbt->ret.error = sv_2mortal( newSVpvn(c->errstr, strlen(c->errstr)) );
}
DEBUG_MSG("%s", "finish");
}
static void Redis__Fast_async_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
Redis__Fast self = (Redis__Fast)c->data;
redis_fast_async_cb_t *cbt = (redis_fast_async_cb_t*)privdata;
redis_fast_reply_t result;
SV* sv_undef;
if (reply) {
self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
dSP;
ENTER;
SAVETMPS;
if(cbt->custom_decode) {
result = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
} else {
result = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
}
sv_undef = sv_2mortal(newSV(0));
if(result.result == NULL) result.result = sv_undef;
if(result.error == NULL) result.error = sv_undef;
PUSHMARK(SP);
XPUSHs(result.result);
XPUSHs(result.error);
PUTBACK;
call_sv(cbt->cb, G_DISCARD);
FREETMPS;
LEAVE;
}
SvREFCNT_dec(cbt->cb);
Safefree(cbt);
}
static void Redis__Fast_subscribe_cb(redisAsyncContext* c, void* reply, void* privdata) {
int is_need_free = 0;
Redis__Fast self = (Redis__Fast)c->data;
redis_fast_subscribe_cb_t *cbt = (redis_fast_subscribe_cb_t*)privdata;
redisReply* r = (redisReply*)reply;
SV* sv_undef;
DEBUG_MSG("%s", "start");
if(!cbt) {
DEBUG_MSG("%s", "cbt is empty finished");
return ;
}
if (r) {
char* stype = r->element[0]->str;
int pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
redis_fast_reply_t res;
dSP;
ENTER;
SAVETMPS;
res = Redis__Fast_decode_reply(self, r, 0);
if (strcasecmp(stype+pvariant,"subscribe") == 0) {
DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
self->is_subscriber = r->element[2]->integer;
self->expected_subs--;
} else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
self->is_subscriber = r->element[2]->integer;
is_need_free = 1;
self->expected_subs--;
} else {
DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
self->proccess_sub_count++;
}
sv_undef = sv_2mortal(newSV(0));
if(res.result == NULL) res.result = sv_undef;
if(res.error == NULL) res.error = sv_undef;
PUSHMARK(SP);
XPUSHs(res.result);
XPUSHs(res.error);
PUTBACK;
call_sv(cbt->cb, G_DISCARD);
FREETMPS;
LEAVE;
} else {
DEBUG_MSG("connect error: %s", c->errstr);
is_need_free = 1;
}
if(is_need_free) {
// destroy private data
DEBUG_MSG("destroy %p", cbt);
if(cbt->cb) {
SvREFCNT_dec(cbt->cb);
cbt->cb = NULL;
}
Safefree(cbt);
}
DEBUG_MSG("%s", "finish");
}
static redis_fast_reply_t Redis__Fast_run_cmd(Redis__Fast self, int collect_errors, CUSTOM_DECODE custom_decode, SV* cb, int argc, const char** argv, size_t* argvlen) {
redis_fast_reply_t ret = {NULL, NULL};
int on_flags = 0, off_flags = ~0;
DEBUG_MSG("start %s", argv[0]);
DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
if(self->pid != getpid()) {
DEBUG_MSG("%s", "pid changed. create new connection..");
Redis__Fast_connect(self);
}
if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
on_flags = FLAG_INSIDE_TRANSACTION;
} else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
off_flags = ~(FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH);
} else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
on_flags = FLAG_INSIDE_WATCH;
} else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
off_flags = ~FLAG_INSIDE_WATCH;
}
if(cb) {
redis_fast_async_cb_t *cbt;
Newx(cbt, sizeof(redis_fast_async_cb_t), redis_fast_async_cb_t);
cbt->cb = SvREFCNT_inc(cb);
cbt->custom_decode = custom_decode;
cbt->collect_errors = collect_errors;
cbt->on_flags = on_flags;
cbt->off_flags = off_flags;
redisAsyncCommandArgv(
self->ac, Redis__Fast_async_reply_cb, cbt,
argc, argv, argvlen
);
ret.result = sv_2mortal(newSViv(1));
} else {
redis_fast_sync_cb_t *cbt;
int i, cnt = (self->reconnect == 0 ? 1 : 2);
int res = WAIT_FOR_EVENT_OK;
for(i = 0; i < cnt; i++) {
Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
self->need_recoonect = 0;
cbt->ret.result = NULL;
cbt->ret.error = NULL;
cbt->custom_decode = custom_decode;
cbt->collect_errors = collect_errors;
cbt->on_flags = on_flags;
cbt->off_flags = off_flags;
DEBUG_MSG("%s", "send command in sync mode");
redisAsyncCommandArgv(
self->ac, Redis__Fast_sync_reply_cb, cbt,
argc, argv, argvlen
);
DEBUG_MSG("%s", "waiting response");
res = _wait_all_responses(self);
if(res == WAIT_FOR_EVENT_OK && !self->need_recoonect) {
ret = cbt->ret;
if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
DEBUG_MSG("finish %s", argv[0]);
return ret;
}
if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
croak("reconnect disabled inside transaction or watch");
}
Redis__Fast_reconnect(self);
}
if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
// else destructor will release cbt
if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
snprintf(self->error, MAX_ERROR_SIZE, "Error while reading from Redis server: %s", strerror(EAGAIN));
croak("%s", self->error);
}
if(!self->ac) {
croak("Not connected to any server");
}
}
DEBUG_MSG("Finish %s", argv[0]);
return ret;
}
static redis_fast_reply_t Redis__Fast_keys_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
// TODO: Support redis <= 1.2.6
return Redis__Fast_decode_reply(self, reply, collect_errors);
}
static redis_fast_reply_t Redis__Fast_info_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
redis_fast_reply_t res = {NULL, NULL};
if(reply->type == REDIS_REPLY_STRING ||
reply->type == REDIS_REPLY_STATUS) {
HV* hv = (HV*)sv_2mortal((SV*)newHV());
char* str = reply->str;
size_t len = reply->len;
res.result = newRV_inc((SV*)hv);
while(len != 0) {
const char* line = (char*)memchr(str, '\r', len);
const char* sep;
size_t linelen;
if(line == NULL) {
linelen = len;
} else {
linelen = line - str;
}
sep = (char*)memchr(str, ':', linelen);
if(str[0] != '#' && sep != NULL) {
SV* val;
size_t keylen;
keylen = sep - str;
val = newSVpvn(sep + 1, linelen - keylen - 1);
hv_store(hv, str, keylen, val, 0);
}
if(line == NULL) {
break;
} else {
len -= linelen + 2;
str += linelen + 2;
}
}
} else {
res = Redis__Fast_decode_reply(self, reply, collect_errors);
}
return res;
}
MODULE = Redis::Fast PACKAGE = Redis::Fast
SV*
_new(char* cls);
PREINIT:
redis_fast_t* self;
CODE:
{
DEBUG_MSG("%s", "start");
Newxz(self, sizeof(redis_fast_t), redis_fast_t);
self->error = (char*)malloc(MAX_ERROR_SIZE);
ST(0) = sv_newmortal();
sv_setref_pv(ST(0), cls, (void*)self);
DEBUG_MSG("return %p", ST(0));
XSRETURN(1);
}
OUTPUT:
RETVAL
int
__set_reconnect(Redis::Fast self, int val)
CODE:
{
RETVAL = self->reconnect = val;
}
OUTPUT:
RETVAL
int
__get_reconnect(Redis::Fast self)
CODE:
{
RETVAL = self->reconnect;
}
OUTPUT:
RETVAL
int
__set_every(Redis::Fast self, int val)
CODE:
{
RETVAL = self->every = val;
}
OUTPUT:
RETVAL
int
__get_every(Redis::Fast self)
CODE:
{
RETVAL = self->every;
}
OUTPUT:
RETVAL
double
__set_cnx_timeout(Redis::Fast self, double val)
CODE:
{
RETVAL = self->cnx_timeout = val;
}
OUTPUT:
RETVAL
double
__get_cnx_timeout(Redis::Fast self)
CODE:
{
RETVAL = self->cnx_timeout;
}
OUTPUT:
RETVAL
double
__set_read_timeout(Redis::Fast self, double val)
CODE:
{
RETVAL = self->read_timeout = val;
}
OUTPUT:
RETVAL
double
__get_read_timeout(Redis::Fast self)
CODE:
{
RETVAL = self->read_timeout;
}
OUTPUT:
RETVAL
double
__set_write_timeout(Redis::Fast self, double val)
CODE:
{
RETVAL = self->write_timeout = val;
}
OUTPUT:
RETVAL
double
__get_write_timeout(Redis::Fast self)
CODE:
{
RETVAL = self->write_timeout;
}
OUTPUT:
RETVAL
int
__set_current_database(Redis::Fast self, int val)
CODE:
{
RETVAL = self->current_database = val;
}
OUTPUT:
RETVAL
int
__get_current_database(Redis::Fast self)
CODE:
{
RETVAL = self->current_database;
}
OUTPUT:
RETVAL
int
__sock(Redis::Fast self)
CODE:
{
RETVAL = self->ac ? self->ac->c.fd : 0;
}
OUTPUT:
RETVAL
int
__get_port(Redis::Fast self)
CODE:
{
struct sockaddr_in addr;
socklen_t len;
len = sizeof( addr );
getsockname( self->ac->c.fd, ( struct sockaddr *)&addr, &len );
RETVAL = addr.sin_port;
}
OUTPUT:
RETVAL
void
__set_on_connect(Redis::Fast self, SV* func)
CODE:
{
self->on_connect = SvREFCNT_inc(func);
}
void
__set_on_build_sock(Redis::Fast self, SV* func)
CODE:
{
self->on_build_sock = SvREFCNT_inc(func);
}
void
__set_data(Redis::Fast self, SV* data)
CODE:
{
self->data = SvREFCNT_inc(data);
}
void
__get_data(Redis::Fast self)
CODE:
{
ST(0) = self->data;
XSRETURN(1);
}
void
is_subscriber(Redis::Fast self)
CODE:
{
ST(0) = sv_2mortal(newSViv(self->is_subscriber));
XSRETURN(1);
}
void
DESTROY(Redis::Fast self);
CODE:
{
DEBUG_MSG("%s", "start");
if (self->ac) {
DEBUG_MSG("%s", "free ac");
redisAsyncFree(self->ac);
self->ac = NULL;
}
if(self->hostname) {
DEBUG_MSG("%s", "free hostname");
free(self->hostname);
self->hostname = NULL;
}
if(self->path) {
DEBUG_MSG("%s", "free path");
free(self->path);
self->path = NULL;
}
if(self->error) {
DEBUG_MSG("%s", "free error");
free(self->error);
self->error = NULL;
}
if(self->on_connect) {
DEBUG_MSG("%s", "free on_connect");
SvREFCNT_dec(self->on_connect);
self->on_connect = NULL;
}
if(self->on_build_sock) {
DEBUG_MSG("%s", "free on_build_sock");
SvREFCNT_dec(self->on_build_sock);
self->on_build_sock = NULL;
}
if(self->data) {
DEBUG_MSG("%s", "free data");
SvREFCNT_dec(self->data);
self->data = NULL;
}
Safefree(self);
DEBUG_MSG("%s", "finish");
}
void
__connection_info(Redis::Fast self, char* hostname, int port = 6379)
CODE:
{
if(self->hostname) {
free(self->hostname);
self->hostname = NULL;
}
if(self->path) {
free(self->path);
self->path = NULL;
}
if(hostname) {
self->hostname = (char*)malloc(strlen(hostname) + 1);
strcpy(self->hostname, hostname);
}
self->port = port;
}
void
__connection_info_unix(Redis::Fast self, char* path)
CODE:
{
if(self->hostname) {
free(self->hostname);
self->hostname = NULL;
}
if(self->path) {
free(self->path);
self->path = NULL;
}
if(path) {
self->path = (char*)malloc(strlen(path) + 1);
strcpy(self->path, path);
}
}
void
connect(Redis::Fast self)
CODE:
{
Redis__Fast_connect(self);
}
void
wait_all_responses(Redis::Fast self)
CODE:
{
int res = _wait_all_responses(self);
if(res != WAIT_FOR_EVENT_OK) {
croak("Error while reading from Redis server");
}
}
void
wait_one_response(Redis::Fast self)
CODE:
{
int res = _wait_all_responses(self);
if(res != WAIT_FOR_EVENT_OK) {
croak("Error while reading from Redis server");
}
}
void
__std_cmd(Redis::Fast self, ...)
PREINIT:
redis_fast_reply_t ret;
SV* cb;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i, collect_errors;
CODE:
{
Redis__Fast_reconnect(self);
if(!self->ac) {
croak("Not connected to any server");
}
cb = ST(items - 1);
if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
argc = items - 2;
} else {
cb = NULL;
argc = items - 1;
}
Newx(argv, sizeof(char*) * argc, char*);
Newx(argvlen, sizeof(size_t) * argc, size_t);
for (i = 0; i < argc; i++) {
if(!sv_utf8_downgrade(ST(i + 1), 1)) {
croak("command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.");
}
argv[i] = SvPV(ST(i + 1), len);
argvlen[i] = len;
}
collect_errors = 0;
if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
collect_errors = 1;
ret = Redis__Fast_run_cmd(self, collect_errors, NULL, cb, argc, (const char**)argv, argvlen);
Safefree(argv);
Safefree(argvlen);
ST(0) = ret.result ? ret.result : sv_2mortal(newSV(0));
ST(1) = ret.error ? ret.error : sv_2mortal(newSV(0));
XSRETURN(2);
}
void
__quit(Redis::Fast self)
PREINIT:
redis_fast_sync_cb_t *cbt;
CODE:
{
DEBUG_MSG("%s", "start");
if(self->ac) {
Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
cbt->ret.result = NULL;
cbt->ret.error = NULL;
cbt->custom_decode = NULL;
redisAsyncCommand(
self->ac, Redis__Fast_sync_reply_cb, cbt, "QUIT"
);
redisAsyncDisconnect(self->ac);
if(_wait_all_responses(self) == WAIT_FOR_EVENT_OK) {
DEBUG_MSG("%s", "wait_all_responses ok");
if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
} else {
DEBUG_MSG("%s", "wait_all_responses not ok");
if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
}
DEBUG_MSG("%s", "finish");
ST(0) = sv_2mortal(newSViv(1));
XSRETURN(1);
} else {
DEBUG_MSG("%s", "finish. there is no connection.");
XSRETURN(0);
}
}
void
__shutdown(Redis::Fast self)
CODE:
{
if(self->ac) {
redisAsyncCommand(
self->ac, NULL, NULL, "SHUTDOWN"
);
redisAsyncDisconnect(self->ac);
_wait_all_responses(self);
ST(0) = sv_2mortal(newSViv(1));
XSRETURN(1);
} else {
XSRETURN(0);
}
}
void
__keys(Redis::Fast self, ...)
PREINIT:
redis_fast_reply_t ret;
SV* cb;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i;
CODE:
{
Redis__Fast_reconnect(self);
cb = ST(items - 1);
if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
argc = items - 1;
} else {
cb = NULL;
argc = items;
}
Newx(argv, sizeof(char*) * argc, char*);
Newx(argvlen, sizeof(size_t) * argc, size_t);
argv[0] = "KEYS";
argvlen[0] = 4;
for (i = 1; i < argc; i++) {
argv[i] = SvPV(ST(i), len);
argvlen[i] = len;
}
ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_keys_custom_decode, cb, argc, (const char**)argv, argvlen);
Safefree(argv);
Safefree(argvlen);
ST(0) = ret.result ? ret.result : sv_2mortal(newSV(0));
ST(1) = ret.error ? ret.error : sv_2mortal(newSV(0));
XSRETURN(2);
}
void
__info(Redis::Fast self, ...)
PREINIT:
redis_fast_reply_t ret;
SV* cb;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i;
CODE:
{
Redis__Fast_reconnect(self);
cb = ST(items - 1);
if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
argc = items - 1;
} else {
cb = NULL;
argc = items;
}
Newx(argv, sizeof(char*) * argc, char*);
Newx(argvlen, sizeof(size_t) * argc, size_t);
argv[0] = "INFO";
argvlen[0] = 4;
for (i = 1; i < argc; i++) {
argv[i] = SvPV(ST(i), len);
argvlen[i] = len;
}
ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_info_custom_decode, cb, argc, (const char**)argv, argvlen);
Safefree(argv);
Safefree(argvlen);
ST(0) = ret.result ? ret.result : sv_2mortal(newSV(0));
ST(1) = ret.error ? ret.error : sv_2mortal(newSV(0));
XSRETURN(2);
}
void
__send_subscription_cmd(Redis::Fast self, ...)
PREINIT:
SV* cb;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i;
redis_fast_subscribe_cb_t* cbt;
int pvariant;
CODE:
{
int cnt = (self->reconnect == 0 ? 1 : 2);
DEBUG_MSG("%s", "start");
Redis__Fast_reconnect(self);
if(!self->is_subscriber) {
_wait_all_responses(self);
}
cb = ST(items - 1);
if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
argc = items - 2;
} else {
cb = NULL;
argc = items - 1;
}
Newx(argv, sizeof(char*) * argc, char*);
Newx(argvlen, sizeof(size_t) * argc, size_t);
for (i = 0; i < argc; i++) {
argv[i] = SvPV(ST(i+1), len);
argvlen[i] = len;
DEBUG_MSG("argv[%d] = %s", i, argv[i]);
}
for(i = 0; i < cnt; i++) {
pvariant = tolower(argv[0][0]) == 'p';
if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
DEBUG_MSG("%s", "command is not unsubscribe");
Newx(cbt, sizeof(redis_fast_subscribe_cb_t), redis_fast_subscribe_cb_t);
cbt->self = self;
cbt->cb = SvREFCNT_inc(cb);
} else {
DEBUG_MSG("%s", "command is unsubscribe");
cbt = NULL;
}
redisAsyncCommandArgv(
self->ac, cbt ? Redis__Fast_subscribe_cb : NULL, cbt,
argc, (const char**)argv, argvlen
);
self->expected_subs = argc - 1;
while(self->expected_subs > 0 && wait_for_event(self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
if(self->expected_subs == 0) break;
Redis__Fast_reconnect(self);
}
Safefree(argv);
Safefree(argvlen);
DEBUG_MSG("%s", "finish");
XSRETURN(0);
}
void
wait_for_messages(Redis::Fast self, double timeout = -1)
CODE:
{
int i, cnt = (self->reconnect == 0 ? 1 : 2);
int res = WAIT_FOR_EVENT_OK;
DEBUG_MSG("%s", "start");
self->proccess_sub_count = 0;
for(i = 0; i < cnt; i++) {
while((res = wait_for_event(self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
Redis__Fast_reconnect(self);
}
if(res == WAIT_FOR_EVENT_EXCEPTION) {
if(!self->ac) {
DEBUG_MSG("%s", "Connection not found");
croak("EOF from server");
} else if(self->ac->c.err == REDIS_ERR_EOF) {
DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
croak("EOF from server");
} else {
DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
snprintf(self->error, MAX_ERROR_SIZE, "[WAIT_FOR_MESSAGES] %s", self->ac->c.errstr);
croak("%s", self->error);
}
}
ST(0) = sv_2mortal(newSViv(self->proccess_sub_count));
DEBUG_MSG("finish with %d", res);
XSRETURN(1);
}
void
__wait_for_event(Redis::Fast self, double timeout = -1)
CODE:
{
DEBUG_MSG("%s", "start");
wait_for_event(self, timeout, timeout);
DEBUG_MSG("%s", "finish");
XSRETURN(0);
}