The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#include "perl_libzmq3.h"

STATIC_INLINE
void
P5ZMQ3_set_bang(pTHX_ int err) {
    SV *errsv = get_sv("!", GV_ADD);
    P5ZMQ3_TRACE(" + Set ERRSV ($!) to %d", err);
    sv_setiv(errsv, err);
    sv_setpv(errsv, zmq_strerror(err));
    errno = err;
}

STATIC_INLINE
SV *
P5ZMQ3_zmq_getsockopt_int(P5ZMQ3_Socket *sock, int option) {
    size_t len;
    int    status;
    I32    i32;
    SV     *sv = newSV(0);

    len = sizeof(i32);
    status = zmq_getsockopt(sock->socket, option, &i32, &len);
    if(status == 0) {
        sv_setiv(sv, i32);
    } else {
        SET_BANG;
    }
    return sv;
}

STATIC_INLINE
SV *
P5ZMQ3_zmq_getsockopt_int64(P5ZMQ3_Socket *sock, int option) {
    size_t  len;
    int     status;
    int64_t i64;
    SV      *sv = newSV(0);

    len = sizeof(i64);
    status = zmq_getsockopt(sock->socket, option, &i64, &len);
    if(status == 0) {
        sv_setiv(sv, i64);
    } else {
        SET_BANG;
    }
    return sv;
}

STATIC_INLINE
SV *
P5ZMQ3_zmq_getsockopt_uint64(P5ZMQ3_Socket *sock, int option) {
    size_t len;
    int    status;
    uint64_t u64;
    SV *sv = newSV(0);

    len = sizeof(u64);
    status = zmq_getsockopt(sock->socket, option, &u64, &len);
    if(status == 0) {
        sv_setuv(sv, u64);
    } else {
        SET_BANG;
    }
    return sv;
}

STATIC_INLINE
SV *
P5ZMQ3_zmq_getsockopt_string(P5ZMQ3_Socket *sock, int option, size_t len) {
    int    status;
    char   *string;
    SV     *sv = newSV(0);

    Newxz(string, len, char);
    status = zmq_getsockopt(sock->socket, option, string, &len);
    if(status == 0) {
        sv_setpvn(sv, string, len);
    } else {
        SET_BANG;
    }
    Safefree(string);

    return sv;
}


STATIC_INLINE
int
P5ZMQ3_zmq_setsockopt_int( P5ZMQ3_Socket *sock, int option, int val) {
    int status;
    status = zmq_setsockopt(sock->socket, option, &val, sizeof(int));
    if (status != 0) {
        SET_BANG;
    }
    return status;
}

STATIC_INLINE
int
P5ZMQ3_zmq_setsockopt_int64( P5ZMQ3_Socket *sock, int option, int64_t val) {
    int status;
    status = zmq_setsockopt(sock->socket, option, &val, sizeof(int64_t));
    if (status != 0) {
        SET_BANG;
    }
    return status;
}

STATIC_INLINE
int
P5ZMQ3_zmq_setsockopt_uint64(P5ZMQ3_Socket *sock, int option, uint64_t val) {
    int status;
    status = zmq_setsockopt(sock->socket, option, &val, sizeof(uint64_t));
    if (status != 0) {
        SET_BANG;
    }
    return status;
}
    
STATIC_INLINE
int
P5ZMQ3_zmq_setsockopt_string(P5ZMQ3_Socket *sock, int option, const char *ptr, size_t len) {
    int status;
    status = zmq_setsockopt(sock->socket, option, ptr, len);
    if (status != 0) {
        SET_BANG;
    }
    return status;
}

STATIC_INLINE
int
P5ZMQ3_Message_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param) {
    P5ZMQ3_Message *const src = (P5ZMQ3_Message *) mg->mg_ptr;
    P5ZMQ3_Message *dest;

    P5ZMQ3_TRACE("Message -> dup");
    PERL_UNUSED_VAR( param );
 
    Newxz( dest, 1, P5ZMQ3_Message );
    zmq_msg_init( dest );
    zmq_msg_copy ( dest, src );
    mg->mg_ptr = (char *) dest;
    return 0;
}

STATIC_INLINE
int
P5ZMQ3_Message_mg_free( pTHX_ SV * const sv, MAGIC *const mg ) {
    P5ZMQ3_Message* const msg = (P5ZMQ3_Message *) mg->mg_ptr;

    PERL_UNUSED_VAR(sv);
    P5ZMQ3_TRACE( "START mg_free (Message)" );
    if ( msg != NULL ) {
        P5ZMQ3_TRACE( " + zmq message %p", msg );
        zmq_msg_close( msg );
        Safefree( msg );
    }
    P5ZMQ3_TRACE( "END mg_free (Message)" );
    return 1;
}

STATIC_INLINE
MAGIC*
P5ZMQ3_Message_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){
    MAGIC* mg;

    assert(sv   != NULL);
    assert(vtbl != NULL);

    for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){
        if(mg->mg_virtual == vtbl){
            assert(mg->mg_type == PERL_MAGIC_ext);
            return mg;
        }
    }

    P5ZMQ3_TRACE( "mg_find (Message)" );
    P5ZMQ3_TRACE( " + SV %p", sv )
    croak("ZMQ::LibZMQ3::Message: Invalid ZMQ::LibZMQ3::Message object was passed to mg_find");
    return NULL; /* not reached */
}

STATIC_INLINE
int
P5ZMQ3_Context_invalidate( P5ZMQ3_Context *ctxt ) {
    int rv = -1;
    int close = 1;
    if (ctxt->ctxt == NULL) {
        close = 0;
        P5ZMQ3_TRACE( " + context already seems to be freed");
    }

    if (ctxt->pid != getpid()) {
        close = 0;
        P5ZMQ3_TRACE( " + context was not created in this process");
    }

#ifdef USE_ITHREADS
    if (ctxt->interp != aTHX) {
        close = 0;
        P5ZMQ3_TRACE( " + context was not created in this thread");
    }
#endif
    if (close) {
#ifdef HAS_ZMQ_CTX_DESTROY
        P5ZMQ3_TRACE( " + calling actual zmq_ctx_destroy()");
        rv = zmq_ctx_destroy( ctxt->ctxt );
#else
        P5ZMQ3_TRACE( " + calling actual zmq_term()");
        rv = zmq_term( ctxt->ctxt );
#endif
        if ( rv != 0 ) {
            SET_BANG;
        } else {
#ifdef USE_ITHREADS
            ctxt->interp = NULL;
#endif
            ctxt->ctxt   = NULL;
            ctxt->pid    = 0;
            Safefree(ctxt);
        }
    }
    return rv;
}

STATIC_INLINE
int
P5ZMQ3_Context_mg_free( pTHX_ SV * const sv, MAGIC *const mg ) {
    P5ZMQ3_Context* const ctxt = (P5ZMQ3_Context *) mg->mg_ptr;
    PERL_UNUSED_VAR(sv);

    P5ZMQ3_TRACE("START mg_free (Context)");
    if (ctxt != NULL) {
        P5ZMQ3_Context_invalidate( ctxt );
        mg->mg_ptr = NULL;
    }
    P5ZMQ3_TRACE("END mg_free (Context)");
    return 1;
}

STATIC_INLINE
MAGIC*
P5ZMQ3_Context_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){
    MAGIC* mg;

    assert(sv   != NULL);
    assert(vtbl != NULL);

    for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){
        if(mg->mg_virtual == vtbl){
            assert(mg->mg_type == PERL_MAGIC_ext);
            return mg;
        }
    }

    croak("ZMQ::LibZMQ3::Context: Invalid ZMQ::LibZMQ3::Context object was passed to mg_find");
    return NULL; /* not reached */
}

STATIC_INLINE
int
P5ZMQ3_Context_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param){
    PERL_UNUSED_VAR(mg);
    PERL_UNUSED_VAR(param);
    return 0;
}

STATIC_INLINE
int
P5ZMQ3_Socket_invalidate( P5ZMQ3_Socket *sock )
{
    SV *ctxt_sv = sock->assoc_ctxt;
    int rv;

    P5ZMQ3_TRACE("START socket_invalidate");
    if (sock->pid != getpid()) {
        return 0;
    }

    P5ZMQ3_TRACE(" + zmq socket %p", sock->socket);
    rv = zmq_close( sock->socket );

    if ( SvOK(ctxt_sv) ) {
        P5ZMQ3_TRACE(" + associated context: %p", ctxt_sv);
        SvREFCNT_dec(ctxt_sv);
        sock->assoc_ctxt = NULL;
    }

    Safefree(sock);

    P5ZMQ3_TRACE("END socket_invalidate");
    return rv;
}

STATIC_INLINE
int
P5ZMQ3_Socket_mg_free(pTHX_ SV* const sv, MAGIC* const mg)
{
    P5ZMQ3_Socket* const sock = (P5ZMQ3_Socket *) mg->mg_ptr;
    PERL_UNUSED_VAR(sv);
    P5ZMQ3_TRACE("START mg_free (Socket)");
    if (sock) {
        P5ZMQ3_Socket_invalidate( sock );
        mg->mg_ptr = NULL;
    }
    P5ZMQ3_TRACE("END mg_free (Socket)");
    return 1;
}

STATIC_INLINE
int
P5ZMQ3_Socket_mg_dup(pTHX_ MAGIC* const mg, CLONE_PARAMS* const param){
    P5ZMQ3_TRACE("START mg_dup (Socket)");
#ifdef USE_ITHREADS /* single threaded perl has no "xxx_dup()" APIs */
    mg->mg_ptr = NULL;
    PERL_UNUSED_VAR(param);
#else
    PERL_UNUSED_VAR(mg);
    PERL_UNUSED_VAR(param);
#endif
    P5ZMQ3_TRACE("END mg_dup (Socket)");
    return 0;
}

STATIC_INLINE
MAGIC*
P5ZMQ3_Socket_mg_find(pTHX_ SV* const sv, const MGVTBL* const vtbl){
    MAGIC* mg;

    assert(sv   != NULL);
    assert(vtbl != NULL);

    for(mg = SvMAGIC(sv); mg; mg = mg->mg_moremagic){
        if(mg->mg_virtual == vtbl){
            assert(mg->mg_type == PERL_MAGIC_ext);
            return mg;
        }
    }

    croak("ZMQ::Socket: Invalid ZMQ::Socket object was passed to mg_find");
    return NULL; /* not reached */
}

STATIC_INLINE
void 
PerlZMQ_free_string(void *data, void *hint) {
    PERL_SET_CONTEXT(hint);
    Safefree( (char *) data );
}

#include "mg-xs.inc"

MODULE = ZMQ::LibZMQ3    PACKAGE = ZMQ::LibZMQ3   PREFIX = P5ZMQ3_

PROTOTYPES: DISABLED

BOOT:
    {
        P5ZMQ3_TRACE( "Booting ZMQ::LibZMQ3" );
#include "constants-xs.inc"
    }


int
zmq_errno()

const char *
zmq_strerror(num)
        int num;

void
P5ZMQ3_zmq_version()
    PREINIT:
        int major, minor, patch;
        I32 gimme;
    PPCODE:
        gimme = GIMME_V;
        if (gimme == G_VOID) {
            /* WTF? you don't want a return value?! */
            XSRETURN(0);
        }

        zmq_version(&major, &minor, &patch);
        if (gimme == G_SCALAR) {
            XPUSHs( sv_2mortal( newSVpvf( "%d.%d.%d", major, minor, patch ) ) );
            XSRETURN(1);
        } else {
            mXPUSHi( major );
            mXPUSHi( minor );
            mXPUSHi( patch );
            XSRETURN(3);
        }

P5ZMQ3_Context *
P5ZMQ3_zmq_init( nthreads = 5 )
        int nthreads;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Context", 20 ));
        void *cxt;
    CODE:
#ifdef HAS_ZMQ_INIT
        P5ZMQ3_TRACE( "START zmq_init" );
        cxt = zmq_init( nthreads );
        if (cxt == NULL) {
            SET_BANG;
            RETVAL = NULL;
        } else {
            Newxz( RETVAL, 1, P5ZMQ3_Context );
            P5ZMQ3_TRACE( " + created context wrapper %p", RETVAL );
            RETVAL->pid    = getpid();
            RETVAL->ctxt   = cxt;
#ifdef USE_ITHREADS
            P5ZMQ3_TRACE( " + threads enabled, aTHX %p", aTHX );
            RETVAL->interp = aTHX;
#endif
            P5ZMQ3_TRACE( " + zmq context %p", RETVAL->ctxt );
        }
        P5ZMQ3_TRACE( "END zmq_init");
#else /* HAS_ZMQ_INIT */
        PERL_UNUSED_VAR(cxt);
        PERL_UNUSED_VAR(nthreads);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_init");
#endif
    OUTPUT:
        RETVAL

P5ZMQ3_Context *
P5ZMQ3_zmq_ctx_new( nthreads = 5 )
        int nthreads;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Context", 20 ));
        void *cxt;
    CODE:
#ifdef HAS_ZMQ_CTX_NEW
        P5ZMQ3_TRACE( "START zmq_ctx_new" );
        cxt = zmq_init( nthreads );
        if (cxt == NULL) {
            SET_BANG;
            RETVAL = NULL;
        } else {
            Newxz( RETVAL, 1, P5ZMQ3_Context );
            P5ZMQ3_TRACE( " + created context wrapper %p", RETVAL );
            RETVAL->pid    = getpid();
            RETVAL->ctxt   = cxt;
#ifdef USE_ITHREADS
            P5ZMQ3_TRACE( " + threads enabled, aTHX %p", aTHX );
            RETVAL->interp = aTHX;
#endif
            P5ZMQ3_TRACE( " + zmq context %p", RETVAL->ctxt );
        }
        P5ZMQ3_TRACE( "END zmq_ctx_new");
#else /* HAS_ZMQ_CTX_NEW */
        PERL_UNUSED_VAR(cxt);
        PERL_UNUSED_VAR(nthreads);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_ctx_new");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_term( ctxt )
        P5ZMQ3_Context *ctxt;
    CODE:
#ifdef HAS_ZMQ_TERM
        RETVAL = P5ZMQ3_Context_invalidate( ctxt );

        if (RETVAL == 0) {
            /* Cancel the SV's mg attr so to not call zmq_term automatically */
            MAGIC *mg =
                P5ZMQ3_Context_mg_find( aTHX_ SvRV(ST(0)), &P5ZMQ3_Context_vtbl );
            mg->mg_ptr = NULL;
            /* mark the original SV's _closed flag as true */
            {
                SV *svr = SvRV(ST(0));
                if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                    croak("PANIC: Failed to store closed flag on blessed reference");
                }
            }
        }
#else /* HAS_ZMQ_TERM */
        PERL_UNUSED_VAR(ctxt);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_term");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_ctx_destroy( ctxt )
        P5ZMQ3_Context *ctxt;
    CODE:
#ifdef HAS_ZMQ_CTX_DESTROY
        RETVAL = P5ZMQ3_Context_invalidate( ctxt );

        if (RETVAL == 0) {
            /* Cancel the SV's mg attr so to not call zmq_ctx_destroy automatically */
            MAGIC *mg =
                P5ZMQ3_Context_mg_find( aTHX_ SvRV(ST(0)), &P5ZMQ3_Context_vtbl );
            mg->mg_ptr = NULL;
            /* mark the original SV's _closed flag as true */
            {
                SV *svr = SvRV(ST(0));
                if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                    croak("PANIC: Failed to store closed flag on blessed reference");
                }
            }
        }
#else /* HAS_ZMQ_CTX_DESTROY */
        PERL_UNUSED_VAR(ctxt);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_ctx_destroy");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_ctx_get(ctxt, option_name)
        P5ZMQ3_Context *ctxt;
        int option_name;
    CODE:
#ifdef HAS_ZMQ_CTX_GET
        RETVAL = zmq_ctx_get(ctxt->ctxt, option_name);
        if (RETVAL == -1) {
            SET_BANG;
        }
#else
        PERL_UNUSED_VAR(ctxt);
        PERL_UNUSED_VAR(option_name);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_ctx_get");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_ctx_set(ctxt, option_name, option_value)
        P5ZMQ3_Context *ctxt;
        int option_name;
        int option_value;
    CODE:
#ifdef HAS_ZMQ_CTX_SET
        RETVAL = zmq_ctx_set(ctxt->ctxt, option_name, option_value);
        if (RETVAL == -1) {
            SET_BANG;
        }
#else
        PERL_UNUSED_VAR(ctxt);
        PERL_UNUSED_VAR(option_name);
        PERL_UNUSED_VAR(option_value);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_ctx_set");
#endif
    OUTPUT:
        RETVAL

P5ZMQ3_Message *
P5ZMQ3_zmq_msg_init()
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Message", 20 ));
        int rc;
    CODE:
        Newxz( RETVAL, 1, P5ZMQ3_Message );
        rc = zmq_msg_init( RETVAL );
        if ( rc != 0 ) {
            SET_BANG;
            zmq_msg_close( RETVAL );
            RETVAL = NULL;
        }
    OUTPUT:
        RETVAL

P5ZMQ3_Message *
P5ZMQ3_zmq_msg_init_size( size )
        IV size;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Message", 20 ));
        int rc;
    CODE: 
        Newxz( RETVAL, 1, P5ZMQ3_Message );
        rc = zmq_msg_init_size(RETVAL, size);
        if ( rc != 0 ) {
            SET_BANG;
            zmq_msg_close( RETVAL );
            RETVAL = NULL;
        }
    OUTPUT:
        RETVAL

P5ZMQ3_Message *
P5ZMQ3_zmq_msg_init_data( data, size = -1)
        SV *data;
        IV size;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Message", 20 ));
        STRLEN x_data_len;
        char *sv_data = SvPV(data, x_data_len);
        char *x_data;
        int rc;
    CODE: 
        P5ZMQ3_TRACE("START zmq_msg_init_data");
        if (size >= 0) {
            x_data_len = size;
        }
        Newxz( RETVAL, 1, P5ZMQ3_Message );
        Newxz( x_data, x_data_len, char );
        Copy( sv_data, x_data, x_data_len, char );
        rc = zmq_msg_init_data(RETVAL, x_data, x_data_len, PerlZMQ_free_string, Perl_get_context());
        if ( rc != 0 ) {
            SET_BANG;
            zmq_msg_close( RETVAL );
            RETVAL = NULL;
        }
        else {
            P5ZMQ3_TRACE(" + zmq_msg_init_data created message %p", RETVAL);
        }
        P5ZMQ3_TRACE("END zmq_msg_init_data");
    OUTPUT:
        RETVAL

SV *
P5ZMQ3_zmq_msg_data(message)
        P5ZMQ3_Message *message;
    CODE:
        P5ZMQ3_TRACE( "START zmq_msg_data" );
        P5ZMQ3_TRACE( " + message content '%s'", (char *) zmq_msg_data(message) );
        P5ZMQ3_TRACE( " + message size '%d'", (int) zmq_msg_size(message) );
        RETVAL = newSV(0);
        sv_setpvn( RETVAL, (char *) zmq_msg_data(message), (STRLEN) zmq_msg_size(message) );
        P5ZMQ3_TRACE( "END zmq_msg_data" );
    OUTPUT:
        RETVAL

size_t
P5ZMQ3_zmq_msg_size(message)
        P5ZMQ3_Message *message;
    CODE:
        RETVAL = zmq_msg_size(message);
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_msg_close(message)
        P5ZMQ3_Message *message;
    CODE:
        P5ZMQ3_TRACE("START zmq_msg_close");
        RETVAL = zmq_msg_close(message);
        Safefree(message);
        if (RETVAL != 0) {
            SET_BANG;
        }

        {
            MAGIC *mg =
                 P5ZMQ3_Message_mg_find( aTHX_ SvRV(ST(0)), &P5ZMQ3_Message_vtbl );
             mg->mg_ptr = NULL;
        }
        /* mark the original SV's _closed flag as true */
        {
            SV *svr = SvRV(ST(0));
            if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                croak("PANIC: Failed to store closed flag on blessed reference");
            }
        }
        P5ZMQ3_TRACE("END zmq_msg_close");
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_msg_move(dest, src)
        P5ZMQ3_Message *dest;
        P5ZMQ3_Message *src;
    CODE:
        RETVAL = zmq_msg_move( dest, src );
        if (RETVAL != 0) {
            SET_BANG;
        }
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_msg_copy (dest, src);
        P5ZMQ3_Message *dest;
        P5ZMQ3_Message *src;
    CODE:
        RETVAL = zmq_msg_copy( dest, src );
        if (RETVAL != 0) {
            SET_BANG;
        }
    OUTPUT:
        RETVAL

P5ZMQ3_Socket *
P5ZMQ3_zmq_socket (ctxt, type)
        P5ZMQ3_Context *ctxt;
        IV type;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Socket", 19 ));
        void *sock = NULL;
    CODE:
        P5ZMQ3_TRACE( "START zmq_socket" );
        sock = zmq_socket( ctxt->ctxt, type );
        if (sock == NULL) {
            RETVAL = NULL;
            SET_BANG;
        } else {
            Newxz( RETVAL, 1, P5ZMQ3_Socket );
            RETVAL->assoc_ctxt = ST(0);
            RETVAL->socket = sock;
            RETVAL->pid = getpid();
            (void) SvREFCNT_inc(RETVAL->assoc_ctxt);
            P5ZMQ3_TRACE( " + created socket %p", RETVAL );
        }
        P5ZMQ3_TRACE( "END zmq_socket" );
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_close(socket)
        P5ZMQ3_Socket *socket;
    CODE:
        RETVAL = P5ZMQ3_Socket_invalidate( socket );
        /* Cancel the SV's mg attr so to not call socket_invalidate again
           during Socket_mg_free
        */
        {
            MAGIC *mg =
                 P5ZMQ3_Socket_mg_find( aTHX_ SvRV(ST(0)), &P5ZMQ3_Socket_vtbl );
             mg->mg_ptr = NULL;
        }

        /* mark the original SV's _closed flag as true */
        {
            SV *svr = SvRV(ST(0));
            if (hv_stores( (HV *) svr, "_closed", &PL_sv_yes ) == NULL) {
                croak("PANIC: Failed to store closed flag on blessed reference");
            }
        }
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_connect(socket, addr)
        P5ZMQ3_Socket *socket;
        char *addr;
    CODE:
        P5ZMQ3_TRACE( "START zmq_connect" );
        P5ZMQ3_TRACE( " + socket %p", socket );
        RETVAL = zmq_connect( socket->socket, addr );
        P5ZMQ3_TRACE(" + zmq_connect returned with rv '%d'", RETVAL);
        if (RETVAL != 0) {
            SET_BANG;
        }
        P5ZMQ3_TRACE( "END zmq_connect" );
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_bind(socket, addr)
        P5ZMQ3_Socket *socket;
        char *addr;
    CODE:
        P5ZMQ3_TRACE( "START zmq_bind" );
        P5ZMQ3_TRACE( " + socket %p", socket );
        RETVAL = zmq_bind( socket->socket, addr );
        P5ZMQ3_TRACE(" + zmq_bind returned with rv '%d'", RETVAL);
        if (RETVAL != 0) {
            SET_BANG;
        }
        P5ZMQ3_TRACE( "END zmq_bind" );
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_unbind(socket, addr)
        P5ZMQ3_Socket *socket;
        const char *addr;
    CODE:
#ifdef HAS_ZMQ_UNBIND
        RETVAL = zmq_unbind(socket, addr);
        if (RETVAL == -1) {
            SET_BANG;
        }
#else
        PERL_UNUSED_VAR(socket);
        PERL_UNUSED_VAR(addr);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_unbind");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_recv(socket, buf_sv, len, flags = 0)
        P5ZMQ3_Socket *socket;
        SV *buf_sv;
        size_t len;
        int flags;
    PREINIT:
        char *buf;
    CODE:
        P5ZMQ3_TRACE( "START zmq_recv" );
        Newxz( buf, len, char );

        RETVAL = zmq_recv( socket->socket, buf, len, flags );
        P5ZMQ3_TRACE(" + zmq_recv returned with rv '%d'", RETVAL);
        if ( RETVAL == -1 ) {
            SET_BANG;
        } else {
            sv_setpvn( buf_sv, buf, len );
        }
        Safefree(buf);
        P5ZMQ3_TRACE( "END zmq_recv" );
    OUTPUT:
        RETVAL
        
int
P5ZMQ3_zmq_msg_recv(msg, socket, flags = 0)
        P5ZMQ3_Message *msg;
        P5ZMQ3_Socket *socket;
        int flags;
    CODE:
#ifndef HAS_ZMQ_MSG_RECV
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_msg_recv");
#else
        P5ZMQ3_TRACE( "START zmq_msg_recv" );
        RETVAL = zmq_msg_recv(msg, socket->socket, flags);
        P5ZMQ3_TRACE(" + zmq_msg_recv returned with rv '%d'", RETVAL);
        if (RETVAL == -1) {
            SET_BANG;
            P5ZMQ3_TRACE(" + zmq_msg_recv got bad status");
        }
        P5ZMQ3_TRACE( "END zmq_msg_recv" );
#endif /* HAS_ZMQ_RCVMSG */
    OUTPUT:
        RETVAL

P5ZMQ3_Message *
P5ZMQ3_zmq_recvmsg(socket, flags = 0)
        P5ZMQ3_Socket *socket;
        int flags;
    PREINIT:
        SV *class_sv = sv_2mortal(newSVpvn( "ZMQ::LibZMQ3::Message", 20 ));
        int rv;
    CODE:
#ifndef HAS_ZMQ_RECVMSG
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_recvmsg");
#else
        P5ZMQ3_TRACE( "START zmq_recvmsg" );
        Newxz(RETVAL, 1, P5ZMQ3_Message);
        rv = zmq_msg_init(RETVAL);
        if (rv != 0) {
            SET_BANG;
            P5ZMQ3_TRACE("zmq_msg_init failed (%d)", rv);
            XSRETURN_EMPTY;
        }
        rv = zmq_recvmsg(socket->socket, RETVAL, flags);
        P5ZMQ3_TRACE(" + zmq_recvmsg with flags %d", flags);
        P5ZMQ3_TRACE(" + zmq_recvmsg returned with rv '%d'", rv);
        if (rv == -1) {
            SET_BANG;
            P5ZMQ3_TRACE(" + zmq_recvmsg got bad status, closing temporary message");
            zmq_msg_close(RETVAL);
            Safefree(RETVAL);
            XSRETURN_EMPTY;
        }
        P5ZMQ3_TRACE( "END zmq_recvmsg" );
#endif /* HAS_ZMQ_RCVMSG */
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_send(socket, message, size = -1, flags = 0)
        P5ZMQ3_Socket *socket;
        SV *message;
        int size;
        int flags;
    PREINIT:
        char *message_buf;
        STRLEN usize;
    CODE:
        P5ZMQ3_TRACE( "START zmq_send" );
        if (! SvOK(message))
            croak("ZMQ::LibZMQ3::zmq_send(): NULL message passed");

        message_buf = SvPV( message, usize );
        if ( size != -1 && (STRLEN)size < usize )
            usize = (STRLEN)size;

        P5ZMQ3_TRACE( " + buffer '%s' (%zu)", message_buf, usize );
        P5ZMQ3_TRACE( " + flags %d", flags);
        RETVAL = zmq_send( socket->socket, message_buf, usize, flags );
        P5ZMQ3_TRACE( " + zmq_send returned with rv '%d'", RETVAL );
        if ( RETVAL == -1 ) {
            P5ZMQ3_TRACE( " ! zmq_send error %s", zmq_strerror( zmq_errno() ) );
            SET_BANG;
        }
        P5ZMQ3_TRACE( "END zmq_send" );
    OUTPUT:
        RETVAL

int
P5ZMQ3__zmq_msg_send(message, socket, flags = 0)
        P5ZMQ3_Message *message;
        P5ZMQ3_Socket *socket;
        int flags;
    CODE:
#ifndef HAS_ZMQ_MSG_SEND
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_msg_send");
#else
        P5ZMQ3_TRACE( "START zmq_msg_send" );
        RETVAL = zmq_msg_send(message, socket->socket, flags);
        P5ZMQ3_TRACE( " + zmq_msg_send returned with rv '%d'", RETVAL );
        if ( RETVAL == -1 ) {
            P5ZMQ3_TRACE( " ! zmq_msg_send error %s", zmq_strerror( zmq_errno() ) );
            SET_BANG;
        }
        P5ZMQ3_TRACE( "END zmq_msg_send" );
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3__zmq_sendmsg(socket, message, flags = 0)
        P5ZMQ3_Socket *socket;
        P5ZMQ3_Message *message;
        int flags;
    CODE:
#ifndef HAS_ZMQ_SENDMSG
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_sendmsg");
#else
        P5ZMQ3_TRACE( "START zmq_sendmsg" );
        RETVAL = zmq_sendmsg(socket->socket, message, flags);
        P5ZMQ3_TRACE( " + zmq_sendmsg returned with rv '%d'", RETVAL );
        if ( RETVAL == -1 ) {
            P5ZMQ3_TRACE( " ! zmq_sendmsg error %s", zmq_strerror( zmq_errno() ) );
            SET_BANG;
        }
        P5ZMQ3_TRACE( "END zmq_sendmsg" );
#endif
    OUTPUT:
        RETVAL

SV *
P5ZMQ3_zmq_getsockopt_int(sock, option)
        P5ZMQ3_Socket *sock;
        int option;

SV *
P5ZMQ3_zmq_getsockopt_int64(sock, option)
        P5ZMQ3_Socket *sock;
        int option;

SV *
P5ZMQ3_zmq_getsockopt_uint64(sock, option)
        P5ZMQ3_Socket *sock;
        int option;

SV *
P5ZMQ3_zmq_getsockopt_string(sock, option, len = 1024)
        P5ZMQ3_Socket *sock;
        int option;
        size_t len;

int
P5ZMQ3_zmq_setsockopt_int(sock, option, val)
        P5ZMQ3_Socket *sock;
        int option;
        int val;

int
P5ZMQ3_zmq_setsockopt_int64(sock, option, val)
        P5ZMQ3_Socket *sock;
        int option;
        int64_t val;

int
P5ZMQ3_zmq_setsockopt_uint64(sock, option, val)
        P5ZMQ3_Socket *sock;
        int option;
        uint64_t val;

int
P5ZMQ3_zmq_setsockopt_string(sock, option, value)
        P5ZMQ3_Socket *sock;
        int option;
        SV *value;
    PREINIT:
        size_t len;
        const char *string;
    CODE:
        string = SvPV( value, len );
        RETVAL = P5ZMQ3_zmq_setsockopt_string(sock, option, string, len);
    OUTPUT:
        RETVAL

void
P5ZMQ3_zmq_poll( list, timeout = 0 )
        AV *list;
        long timeout;
    PREINIT:
        I32 list_len;
        zmq_pollitem_t *pollitems;
        CV **callbacks;
        int i;
        int rv;
        int eventfired;
    PPCODE:
        P5ZMQ3_TRACE( "START zmq_poll" );

        list_len = av_len( list ) + 1;
        if (list_len <= 0) {
            XSRETURN(0);
        }

        Newxz( pollitems, list_len, zmq_pollitem_t);
        Newxz( callbacks, list_len, CV *);

        /* list should be a list of hashrefs fd, events, and callbacks */
        for (i = 0; i < list_len; i++) {
            SV **svr = av_fetch( list, i, 0 );
            HV  *elm;

            P5ZMQ3_TRACE( " + processing element %d", i );
            if (svr == NULL || ! SvOK(*svr) || ! SvROK(*svr) || SvTYPE(SvRV(*svr)) != SVt_PVHV) {
                Safefree( pollitems );
                Safefree( callbacks );
                croak("Invalid value on index %d", i);
            }
            elm = (HV *) SvRV(*svr);

            callbacks[i] = NULL;
            pollitems[i].revents = 0;
            pollitems[i].events  = 0;
            pollitems[i].fd      = 0;
            pollitems[i].socket  = NULL;

            svr = hv_fetch( elm, "socket", 6, NULL );
            if (svr != NULL) {
                MAGIC *mg;
                if (! SvOK(*svr) || !sv_isobject( *svr) || ! sv_isa(*svr, "ZMQ::LibZMQ3::Socket")) {
                    Safefree( pollitems );
                    Safefree( callbacks );
                    croak("Invalid 'socket' given for index %d", i);
                }
                mg = P5ZMQ3_Socket_mg_find( aTHX_ SvRV(*svr), &P5ZMQ3_Socket_vtbl );
                pollitems[i].socket = ((P5ZMQ3_Socket *) mg->mg_ptr)->socket;
                P5ZMQ3_TRACE( " + via pollitem[%d].socket = %p", i, pollitems[i].socket );
            } else {
                svr = hv_fetch( elm, "fd", 2, NULL );
                if (svr == NULL || ! SvOK(*svr) || SvTYPE(*svr) != SVt_IV) {
                    Safefree( pollitems );
                    Safefree( callbacks );
                    croak("Invalid 'fd' given for index %d", i);
                }
                pollitems[i].fd = SvIV( *svr );
                P5ZMQ3_TRACE( " + via pollitem[%d].fd = %d", i, pollitems[i].fd );
            }

            svr = hv_fetch( elm, "events", 6, NULL );
            if (svr == NULL || ! SvOK(*svr) || SvTYPE(*svr) != SVt_IV) {
                Safefree( pollitems );
                Safefree( callbacks );
                croak("Invalid 'events' given for index %d", i);
            }
            pollitems[i].events = SvIV( *svr );
            P5ZMQ3_TRACE( " + going to poll events %d", pollitems[i].events );

            svr = hv_fetch( elm, "callback", 8, NULL );
            if (svr == NULL || ! SvOK(*svr) || ! SvROK(*svr) || SvTYPE(SvRV(*svr)) != SVt_PVCV) {
                Safefree( pollitems );
                Safefree( callbacks );
                croak("Invalid 'callback' given for index %d", i);
            }
            callbacks[i] = (CV *) SvRV( *svr );
        }

        /* now call zmq_poll */
        rv = zmq_poll( pollitems, list_len, timeout );
        SET_BANG;
        P5ZMQ3_TRACE( " + zmq_poll returned with rv '%d'", RETVAL );

        if (rv != -1 ) {
            for ( i = 0; i < list_len; i++ ) {
                P5ZMQ3_TRACE( " + checking events for %d", i );
                eventfired = 
                    (pollitems[i].revents & pollitems[i].events) ? 1 : 0;
                if (GIMME_V == G_ARRAY) {
                    mXPUSHi(eventfired);
                }

                if (eventfired) {
                    dSP;
                    ENTER;
                    SAVETMPS;
                    PUSHMARK(SP);
                    PUTBACK;

                    call_sv( (SV*)callbacks[i], G_SCALAR );
                    SPAGAIN;

                    PUTBACK;
                    FREETMPS;
                    LEAVE;
                }
            }
        }

        if (GIMME_V == G_SCALAR) {
            mXPUSHi(rv);
        }
        Safefree(pollitems);
        Safefree(callbacks);
        P5ZMQ3_TRACE( "END zmq_poll" );

int
P5ZMQ3_zmq_device( device, insocket, outsocket )
        int device;
        P5ZMQ3_Socket *insocket;
        P5ZMQ3_Socket *outsocket;
    CODE:
#ifdef HAS_ZMQ_DEVICE
        RETVAL = zmq_device( device, insocket->socket, outsocket->socket );
        if (RETVAL != 0) {
            SET_BANG;
        }
#else
        PERL_UNUSED_VAR(device);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_device");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_proxy(frontend, backend, capture = NULL)
        P5ZMQ3_Socket *frontend;
        P5ZMQ3_Socket *backend;
        P5ZMQ3_Socket *capture;
    CODE:
#ifdef HAS_ZMQ_PROXY
        if (capture) {
          capture = capture->socket;
        }
        RETVAL = zmq_proxy(frontend->socket, backend->socket, capture);
        if (RETVAL != 0) {
            SET_BANG;
        }
#else
        PERL_UNUSED_VAR(frontend);
        PERL_UNUSED_VAR(backend);
        PERL_UNUSED_VAR(capture);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_proxy");
#endif
    OUTPUT:
        RETVAL

int
P5ZMQ3_zmq_socket_monitor(socket, addr, events)
        P5ZMQ3_Socket *socket;
        char *addr;
        int events;
    CODE:
#ifdef HAS_ZMQ_SOCKET_MONITOR
        RETVAL = zmq_socket_monitor(socket, addr, events);
        if (RETVAL != 0) {
            SET_BANG;
        }
#else
        PERL_UNUSED_VAR(socket);
        PERL_UNUSED_VAR(addr);
        PERL_UNUSED_VAR(events);
        P5ZMQ3_FUNCTION_UNAVAILABLE("zmq_socket_monitor");
#endif
    OUTPUT:
        RETVAL