The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include <errno.h>
#include "ppport.h"
#include <xs_object_magic.h>
#include <zmq.h>
#include "zmqxs.h"

MODULE = ZeroMQ::Raw	PACKAGE = ZeroMQ::Raw   PREFIX = zmq_
PROTOTYPES: DISABLE

void
zmq_version()
    PREINIT:
        int major, minor, patch = 0;
    PPCODE:
        zmq_version(&major, &minor, &patch);
        EXTEND(SP, 3);
        PUSHs(sv_2mortal(newSViv(major)));
        PUSHs(sv_2mortal(newSViv(minor)));
        PUSHs(sv_2mortal(newSViv(patch)));

MODULE = ZeroMQ::Raw	PACKAGE = ZeroMQ::Raw::Context	PREFIX = zmq_

void
zmq_init(SV *self, int threads)
    PREINIT:
        zmq_ctx_t *ctx;

    CODE:
        zmqxs_ensure_unallocated(self);
        ctx = zmq_init(threads);
        if(ctx == NULL){
            SET_BANG;
            if(_ERRNO == EINVAL)
                croak("Invalid number of threads (%d) passed to zmq_init", threads);

            croak("Unknown error allocating ZMQ context");
        }
        xs_object_magic_attach_struct(aTHX_ SvRV(self), ctx);

void
zmq_term(zmq_ctx_t *ctx);
    PREINIT:
        int status = 0;
    CODE:
        status = zmq_term(ctx);
        if(status < 0){
            SET_BANG;
            if(_ERRNO == EFAULT)
                croak("Invalid context (%p) passed to zmq_term", ctx);

            croak("Unknown error terminating ZMQ context");
        }

bool
zmq_has_valid_context(SV *self)
    CODE:
        RETVAL = zmqxs_has_object(self);
    OUTPUT:
        RETVAL

MODULE = ZeroMQ::Raw	PACKAGE = ZeroMQ::Raw::Message	PREFIX = zmq_msg_

void
zmq_msg_init(SV *self)
    PREINIT:
        zmq_msg_t *msg;
    CODE:
        ZMQ_MSG_ALLOCATE(zmq_msg_init(msg));

void
zmq_msg_init_size(SV *self, size_t size)
    PREINIT:
        zmq_msg_t *msg;
    CODE:
        ZMQ_MSG_ALLOCATE(zmq_msg_init_size(msg, size));

void
zmq_msg_init_data(SV *self, SV *data)
    PREINIT:
        zmq_msg_t *msg;
        STRLEN len;
        char *buf;
        char *copy;
    CODE:
        if(!SvPOK(data))
            croak("You must pass init_data an SvPV and 0x%p is not one", data);
        if(SvUTF8(data))
            croak("Wide character in init_data, you must encode characters");

        buf = SvPV(data, len);
        copy = savepvn(buf, len);
        ZMQ_MSG_ALLOCATE(zmq_msg_init_data(msg, copy, len, &zmqxs_free_data, NULL));

int
zmq_msg_size(zmq_msg_t *msg)

SV *
zmq_msg_data(zmq_msg_t *msg)
    PREINIT:
        char *buf;
        size_t len;
    CODE:
        len = zmq_msg_size(msg);
        if(len < 1)
            XSRETURN_UNDEF;

        buf = zmq_msg_data(msg);
        RETVAL = newSVpv(buf, len);
    OUTPUT:
        RETVAL

void
zmq_msg_close(zmq_msg_t *msg)
    PREINIT:
        int status = 0;
    CODE:
        status = zmq_msg_close(msg);
        Safefree(msg);
        if(status < 0){
            SET_BANG;
            croak("Error closing message %p", msg);
        }

bool
zmq_msg_is_allocated(SV *self)
    CODE:
        RETVAL = zmqxs_has_object(self);
    OUTPUT:
        RETVAL

MODULE = ZeroMQ::Raw	PACKAGE = ZeroMQ::Raw::Socket	PREFIX = zmq_

zmq_sock_err
zmq_init_socket(SV *self, zmq_ctx_t *context, int type)
    PREINIT:
        zmq_sock_t *sock;
    CODE:
        sock = zmq_socket(context, type);
        if(sock == NULL){
            RETVAL = 1;
        }
        else {
            xs_object_magic_attach_struct(aTHX_ SvRV(self), sock);
            RETVAL = 0;
        }
    OUTPUT:
        RETVAL

zmq_sock_err
zmq_close(zmq_sock_t *sock)

zmq_sock_err
zmq_bind(zmq_sock_t *sock, const char *endpoint)

zmq_sock_err
zmq_connect(zmq_sock_t *sock, const char *endpoint)

zmq_sock_err
zmq_send(zmq_sock_t *sock, zmq_msg_t *msg, int flags)

zmq_sock_err
zmq_recv(zmq_sock_t *sock, zmq_msg_t *msg, int flags)

zmq_sock_err
zmq_setsockopt(zmq_sock_t *sock, int option, SV *value)
    PREINIT:
        STRLEN len;
        const char *ptr;
        uint64_t u64;
        int64_t  i64;
        int        i;
    CODE:
        switch(option){
            case ZMQ_LINGER:
            case ZMQ_RECONNECT_IVL:
            case ZMQ_BACKLOG:
                i = SvIV(value);
                RETVAL = zmq_setsockopt(sock, option, &i, sizeof(int));
                break;

            case ZMQ_IDENTITY:
            case ZMQ_SUBSCRIBE:
            case ZMQ_UNSUBSCRIBE:
                ptr = SvPV(value, len);
                RETVAL = zmq_setsockopt(sock, option, ptr, len);
                break;

            case ZMQ_SWAP:
            case ZMQ_RATE:
            case ZMQ_RECOVERY_IVL:
            case ZMQ_RECOVERY_IVL_MSEC:
            case ZMQ_MCAST_LOOP:
                i64 = SvIV(value);
                RETVAL = zmq_setsockopt(sock, option, &i64, sizeof(int64_t));
                break;

            case ZMQ_HWM:
            case ZMQ_AFFINITY:
            case ZMQ_SNDBUF:
            case ZMQ_RCVBUF:
                u64 = SvUV(value);
                RETVAL = zmq_setsockopt(sock, option, &u64, sizeof(uint64_t));
                break;

            default:
                warn("Unknown sockopt type %d, assuming string.  Send patch", option);
                ptr = SvPV(value, len);
                RETVAL = zmq_setsockopt(sock, option, ptr, len);
        }
    OUTPUT:
        RETVAL

SV *
zmq_getsockopt(zmq_sock_t *sock, int option)
    PREINIT:
        char     buf[256];
        int      i;
        uint64_t u64;
        int64_t  i64;
        uint32_t i32;
        size_t   len;
        int      status = -1;
    CODE:
        switch(option){
            case ZMQ_TYPE:
            case ZMQ_LINGER:
            case ZMQ_RECONNECT_IVL:
            case ZMQ_BACKLOG:
            case ZMQ_FD:
                len = sizeof(i);
                status = zmq_getsockopt(sock, option, &i, &len);
                if(status == 0)
                    RETVAL = newSViv(i);
                break;

            case ZMQ_RCVMORE:
            case ZMQ_SWAP:
            case ZMQ_RATE:
            case ZMQ_RECOVERY_IVL:
            case ZMQ_MCAST_LOOP:
                len = sizeof(i64);
                status = zmq_getsockopt(sock, option, &i64, &len);
                if(status == 0)
                    RETVAL = newSViv(i64);
                break;

            case ZMQ_HWM:
            case ZMQ_AFFINITY:
            case ZMQ_SNDBUF:
            case ZMQ_RCVBUF:
                len = sizeof(u64);
                status = zmq_getsockopt(sock, option, &u64, &len);
                if(status == 0)
                    RETVAL = newSVuv(u64);
                break;

            case ZMQ_EVENTS:
                len = sizeof(i32);
                status = zmq_getsockopt(sock, option, &i32, &len);
                if(status == 0)
                    RETVAL = newSViv(i32);
                break;

            case ZMQ_IDENTITY:
                len = sizeof(buf);
                status = zmq_getsockopt(sock, option, &buf, &len);
                if(status == 0)
                    RETVAL = newSVpvn(buf, len);
                break;
        }
        if(status != 0){
	    SET_BANG;
	    switch(_ERRNO) {
	        case EINTR:
                    croak("The operation was interrupted by delivery of a signal");
	        case ETERM:
	            croak("The 0MQ context accociated with the specified socket was terminated");
	        case EFAULT:
	            croak("The provided socket was not valid");
                case EINVAL:
                    croak("Invalid argument");
	        default:
	            croak("Unknown error reading socket option");
	    }
	}
    OUTPUT:
        RETVAL

bool
zmq_is_allocated(SV *self)
    CODE:
        RETVAL = zmqxs_has_object(self);
    OUTPUT:
        RETVAL