The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/*

	WebSphere MQ Telemetry Transport
	Perl Interface to IA93

	Nicholas Humfrey
	University of Southampton
	njh@ecs.soton.ac.uk
	
*/

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

/* WMQTT include */
#include "MQIsdp.h"
#include "ppersist.h"



/* Get the connection handle from hashref */
MQISDPCH
get_handle_from_hv( HV* hash ) {
	SV** svp = NULL;
	IV pointer;
	
	svp = hv_fetch( hash, "handle", 6, 0 );
	if (svp == NULL || !SvOK(*svp)) {
		warn("Connection handle is missing from hash");
		return NULL;
	}
	
	if (!sv_derived_from(*svp, "MQISDPCH")) {
		//warn("Connection handle isn't isn't of type MQISDPCH");
		return NULL;
	}

	// Re-reference and extract the pointer
	pointer = SvIV((SV*)SvRV(*svp));
	return INT2PTR(MQISDPCH,pointer);
}


/* Get debug settings from hashref */
int
get_debug_from_hv( HV* hash ) {
	SV** svp = NULL;
	
	svp = hv_fetch( hash, "debug", 5, 0 );
	if (svp == NULL) {
		warn("Debug setting is missing from hash");
		return 0;
	}
	
	return SvTRUE( *svp );
}


/* Get task info from hashref */
MQISDPTI*
get_task_info_from_hv( HV* hash, char* name ) {
	SV** svp = NULL;
	IV pointer;
	
	svp = hv_fetch( hash, name, strlen(name), 0 );
	if (svp == NULL || !SvOK(*svp)) return NULL;
	if (!sv_derived_from(*svp, "MQISDPTIPtr")) return NULL;

	// Re-reference and extract the pointer
	pointer = SvIV((SV*)SvRV(*svp));
	return INT2PTR(MQISDPTI*,pointer);
}


/* Undefine the value of a key */
void
hv_key_undef( HV* hash, char* key ) {
	SV** svp = NULL;

	svp = hv_fetch( hash, key, strlen(key), 0 );
	if (svp) {
		sv_setsv(*svp, &PL_sv_undef);
	} /* else {
		warn("hv_key_undef: Didn't find key in hash");
	} */
}


/* Make an array from pubOptions */
AV*
options_to_av( long pubOptions ) {
	SV* sv = NULL;
	AV* av = newAV();
	
	if (pubOptions & MQISDP_WILL) {
		sv = newSVpv( "WILL", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_RETAIN) {
		sv = newSVpv( "RETAIN", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_QOS_0) {
		sv = newSVpv( "QOS_0", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_QOS_1) {
		sv = newSVpv( "QOS_1", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_QOS_2) {
		sv = newSVpv( "QOS_2", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_CLEAN_START) {
		sv = newSVpv( "CLEAN_START", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_WILL_RETAIN) {
		sv = newSVpv( "WILL_RETAIN", 0 );
		av_push( av, sv );
	}
	
	if (pubOptions & MQISDP_DUPLICATE) {
		sv = newSVpv( "DUPLICATE", 0 );
		av_push( av, sv );
	}

	return av;
}



#define STATUS_CASE_RET( x ) \
  case x:           \
    return #x;    \
    break;


const char*
get_status_string( int statusCode ) {

 	switch(statusCode) {
		case MQISDP_OK:						return "OK";
		case MQISDP_PROTOCOL_VERSION_ERROR:	return "PROTOCOL_VERSION_ERROR";
		case MQISDP_HOSTNAME_NOT_FOUND:		return "HOSTNAME_NOT_FOUND";
		case MQISDP_Q_FULL:					return "Q_FULL";
		case MQISDP_FAILED:					return "FAILED";
		case MQISDP_PUBS_AVAILABLE:			return "PUBS_AVAILABLE";
		case MQISDP_NO_PUBS_AVAILABLE:		return "NO_PUBS_AVAILABLE";
		case MQISDP_PERSISTENCE_FAILED:		return "PERSISTENCE_FAILED";
		case MQISDP_CONN_HANDLE_ERROR:		return "CONN_HANDLE_ERROR";
		case MQISDP_NO_WILL_TOPIC:			return "NO_WILL_TOPIC";
		case MQISDP_INVALID_STRUC_LENGTH:	return "INVALID_STRUC_LENGTH";
		case MQISDP_DATA_LENGTH_ERROR:		return "DATA_LENGTH_ERROR";
		case MQISDP_DATA_TOO_BIG:			return "DATA_TOO_BIG";
		case MQISDP_ALREADY_CONNECTED:		return "ALREADY_CONNECTED";
		case MQISDP_CONNECTION_BROKEN:		return "CONNECTION_BROKEN";
		case MQISDP_DATA_TRUNCATED:			return "DATA_TRUNCATED";
		case MQISDP_CLIENT_ID_ERROR:		return "CLIENT_ID_ERROR";
		case MQISDP_BROKER_UNAVAILABLE:		return "BROKER_UNAVAILABLE";
		case MQISDP_SOCKET_CLOSED:			return "SOCKET_CLOSED";
		case MQISDP_OUT_OF_MEMORY:			return "OUT_OF_MEMORY";
 		
		case MQISDP_DELIVERED:				return "DELIVERED";
		case MQISDP_RETRYING:				return "RETRYING";
		case MQISDP_IN_PROGRESS:			return "IN_PROGRESS";
		case MQISDP_MSG_HANDLE_ERROR:		return "MSG_HANDLE_ERROR";
		
		case MQISDP_CONNECTING:				return "CONNECTING";
		case MQISDP_CONNECTED:				return "CONNECTED";
		case MQISDP_DISCONNECTED:			return "DISCONNECTED";
	}
	
	return "UNKNOWN";
}


MODULE = WebSphere::MQTT::Client	PACKAGE = WebSphere::MQTT::Client


##
## Prints library version to STDOUT
##
void
xs_version()
  CODE:
   MQIsdp_version();


##
## Alocate memory for TaskInfo
##
int
xs_start_tasks( self )
	HV* self
  PREINIT:
  	MQISDPTI* pSendTaskInfo = NULL;
  	MQISDPTI* pRcvTaskInfo = NULL;
  	MQISDPTI* pApiTaskInfo = NULL;
  	char *clientid = NULL;
  	SV** svp = NULL;
 	SV* sv = NULL;

  CODE:
  	/* Get the client ID */
  	svp = hv_fetch( self, "clientid", 8, 0 );
  	if (svp != NULL) {
  		clientid = SvPV_nolen( *svp );
  		if (strlen(clientid) < 1 || strlen(clientid) > 23) {
  			croak("clientid is not valid");
  		}
  	} else {
  		croak("clientid is not defined");
   	}
  	
  	
	/* Allocate the WMQTT thread parameter structures */
	pSendTaskInfo = (MQISDPTI*)malloc( sizeof(MQISDPTI) );
	pRcvTaskInfo = (MQISDPTI*)malloc( sizeof(MQISDPTI) );
	pApiTaskInfo = (MQISDPTI*)malloc( sizeof(MQISDPTI) );
	
	/* Zero the memory */
	bzero( pSendTaskInfo, sizeof(MQISDPTI) );
	bzero( pRcvTaskInfo, sizeof(MQISDPTI) );
	bzero( pApiTaskInfo, sizeof(MQISDPTI) );
	
	/* Turn thread tracing off */
	pSendTaskInfo->logLevel = LOGNONE;
	pRcvTaskInfo->logLevel = LOGNONE;
	pApiTaskInfo->logLevel = LOGNONE;


	/* Start the threads (if enabled) */
	if ( MQIsdp_StartTasks( pApiTaskInfo, pSendTaskInfo,
                            pRcvTaskInfo, clientid ) != 0 ) {
        croak("Failed to start MQIsdp protocol threads");
        XSRETURN_UNDEF;
    }
    
    /* Store thread parameter pointers */
	sv=sv_setref_pv(newSV(0), "MQISDPTIPtr", (void *)pSendTaskInfo);
	if (hv_store(self, "send_task_info", 14, sv, 0) == NULL) {
		croak("send_task_info not stored");
	}
	
	sv=sv_setref_pv(newSV(0), "MQISDPTIPtr", (void *)pRcvTaskInfo);
	if (hv_store(self, "recv_task_info", 14, sv, 0) == NULL) {
		croak("recv_task_info not stored");
	}
	
	sv=sv_setref_pv(newSV(0), "MQISDPTIPtr", (void *)pApiTaskInfo);
	if (hv_store(self, "api_task_info", 13, sv, 0) == NULL) {
		croak("api_task_info not stored");
	}

    
    /* Successful */
    RETVAL = 1;
    
  OUTPUT:
	self
	RETVAL

 
	
##
## Get connection status
##
const char*
xs_status( self )
	HV* self

  PREINIT:
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
	int			statusCode=0;
	int			debug=0;
	char        infoString[MQISDP_INFO_STRING_LENGTH] = "";
	const char	*statusString = NULL;
	
  CODE:
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );  
  	debug = get_debug_from_hv( self );
  	
  	/* Not connected ? */
  	if (handle == NULL) {
  		statusCode = MQISDP_DISCONNECTED;
  	} else {
		/* get the connection status */
		statusCode = MQIsdp_status( handle, MQISDP_RC_STRING_LENGTH, NULL, infoString );
 	}
 	
 	/* Turn status code into a string */
 	statusString = get_status_string( statusCode );
 	if (debug) {
 		fprintf(stderr, "xs_status: %s [%d] - %s\n", statusString, statusCode, infoString);
 	}
 	
	RETVAL = statusString;
 	
  OUTPUT:
	RETVAL
 	
 
 
  
##
## Connect to broker
##
const char*
xs_connect( self, pApiTaskInfo )
	HV* self
   	MQISDPTI	*pApiTaskInfo

 PREINIT:
  	CONN_PARMS	*pCp = NULL;
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
  	long        connMsgLength = 0;
  	SV**		svp = NULL;
 	SV*			sv = NULL;
 	int			rc = MQISDP_FAILED;
  	
  CODE:
  	/* length of Connect Messgae */
	connMsgLength = sizeof(CONN_PARMS);
	
	/* Create Connect data structure */
	pCp = (CONN_PARMS*)malloc( connMsgLength );
	pCp->strucLength = connMsgLength;



    /* Fill out parameters from hashref */
    svp = hv_fetch( self, "clientid", 8, 0 );
    if (svp && SvPOK(*svp))	strcpy( pCp->clientId, SvPV_nolen(*svp) );
	else		croak("'clientid' setting isn't available");
    
    svp = hv_fetch( self, "retry_count", 11, 0 );
    if (svp && SvIOK(*svp))	pCp->retryCount = SvIV(*svp);
	else		croak("'retry_count' setting isn't available");

    svp = hv_fetch( self, "retry_interval", 14, 0 );
    if (svp && SvIOK(*svp))	pCp->retryInterval = SvIV(*svp);
	else		croak("'retry_interval' setting isn't available");

    svp = hv_fetch( self, "keep_alive", 10, 0 );
    if (svp && SvIOK(*svp))	pCp->keepAliveTime = SvIV(*svp);
	else		croak("'keep_alive' setting isn't available");

    svp = hv_fetch( self, "host", 4, 0 );
    if (svp && SvPOK(*svp))	pCp->brokerHostname = SvPV_nolen(*svp);
	else		croak("'host' setting isn't available");

    svp = hv_fetch( self, "port", 4, 0 );
    if (svp && SvIOK(*svp))	pCp->brokerPort = SvIV(*svp);
	else		croak("'port' setting isn't available");

    svp = hv_fetch( self, "persist", 7, 0 );
    if (svp && SvOK(*svp)) {
		if (sv_isobject(*svp)) {
			pCp->pPersistFuncs = new_persistence_wrapper(*svp);
			sv=sv_setref_pv(newSV(0), "MQISDPTIPtr", (void *)pCp->pPersistFuncs);
			if (hv_store(self, "persist_info", 12, sv, 0) == NULL) {
				croak("persist_info not stored");
			}
		}
		else	croak("'persist' setting must be an object");
    }
    else	pCp->pPersistFuncs = NULL;

	/* Set options flags */
	pCp->options = MQISDP_NONE;
	svp = hv_fetch( self, "clean_start", 11, 0 );
	if (svp) {
		if (SvIV(*svp)) pCp->options |= MQISDP_CLEAN_START;
	} else {
		croak("'clean_start' setting isn't available");
	}

	/* Perform the connect */
	rc = MQIsdp_connect( &handle, pCp, pApiTaskInfo );
	free( pCp );


    /* Store connection handle pointer */
	sv=sv_setref_pv(newSV(0), "MQISDPCH", (void *)handle);
	if (hv_store(self, "handle", 6, sv, 0) == NULL) {
		croak("connection handle not stored");
	}
	
	/* Return result code as a string */
	RETVAL = get_status_string( rc );
	
  OUTPUT:
	RETVAL
	


##
## Disconnect from broker
##
const char*
xs_disconnect( self )
	HV* self

  PREINIT:
   	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
 	int			rc = MQISDP_FAILED;

  CODE:
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );  	

	if (handle) {  	
  		/* perform the disconnect */
  		rc = MQIsdp_disconnect( &handle );
  	
  		/* Undef 'handle' if its value now NULL */
  		if (handle==MQISDP_INV_CONN_HANDLE) hv_key_undef( self, "handle" );
	}
  
  	RETVAL = get_status_string( rc );
  	
  OUTPUT:
	RETVAL


##
## Free memory and Terminate threads
##
const char*
xs_terminate( self )
	HV* self

  PREINIT:

  CODE:
  	MQISDPTI *pApiTaskInfo = get_task_info_from_hv( self, "api_task_info" );
  	MQISDPTI *pSendTaskInfo = get_task_info_from_hv( self, "send_task_info" );
  	MQISDPTI *pRcvTaskInfo = get_task_info_from_hv( self, "recv_task_info" );
  	MQISDPTI *pPersistInfo = get_task_info_from_hv( self, "persist_info" );
  
  	/* Free the memory */
  	if (pApiTaskInfo) free( pApiTaskInfo );
  	if (pSendTaskInfo) free( pSendTaskInfo );
  	if (pRcvTaskInfo) free( pRcvTaskInfo );
	if (pPersistInfo) free( pPersistInfo );

	/* Undef them in the hash */
	hv_key_undef( self, "api_task_info");
	hv_key_undef( self, "send_task_info");
	hv_key_undef( self, "recv_task_info");
	hv_key_undef( self, "persist_info");

	/* Terminate threads and return result as a string */
  	RETVAL = get_status_string( MQIsdp_terminate() );
  	
  OUTPUT:
	RETVAL





##
## Subscribe to a topic
##
const char*
xs_subscribe( self, topic, qos )
	HV*		self
	char*	topic
	int		qos

  PREINIT:
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
  	MQISDPMH	hMsg = 0;
  	SUB_PARMS	*pSp = NULL;
	int			bufSize = 0;
	int			rc = 0;
	
  CODE:
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );  
  	
	/* Allocate memory for stucture */
	bufSize = sizeof(SUB_PARMS) + (2 * sizeof(long)) + strlen(topic);
	pSp = (SUB_PARMS*)malloc( bufSize );

	if (pSp) {
		char	*pTmpPtr = NULL;
		long	options = 0;
		long	tLength = 0;
	
		pSp->strucLength = bufSize;
	
        /* Set the topic length field */
        pTmpPtr = (char*)pSp + sizeof(long);
        tLength = strlen(topic);
        memcpy( pTmpPtr, &tLength, sizeof(long) );

        /* Set the topic field */
        pTmpPtr += sizeof(long);
        memcpy( pTmpPtr, topic, strlen(topic) );

        /* Set the options field */
        pTmpPtr += strlen(topic);
        switch ( qos ) {
			case 0: options |= MQISDP_QOS_0; break;
			case 1: options |= MQISDP_QOS_1; break;
			case 2: options |= MQISDP_QOS_2; break;
        }
        memcpy( pTmpPtr, &options, sizeof(long) );

		/* Subscribe */
		rc = MQIsdp_subscribe( handle, &hMsg, pSp );
		free( pSp );

	} else {
		rc = MQISDP_OUT_OF_MEMORY;
	}

	RETVAL = get_status_string(rc);
 	
  OUTPUT:
	RETVAL
 	
 
 
  
##
## Unsubscribe from a topic
##
const char*
xs_unsubscribe( self, topic )
	HV*		self
	char*	topic

  PREINIT:
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
  	MQISDPMH	hMsg = 0;
  	UNSUB_PARMS	*pUp = NULL;
	int			bufSize = 0;
	int			rc = 0;
	
  CODE:
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );  
  	
	/* Allocate memory for stucture */
	bufSize = sizeof(UNSUB_PARMS) + sizeof(long) + strlen(topic);
	pUp = (UNSUB_PARMS*)malloc( bufSize );

	if (pUp) {
		char	*pTmpPtr = NULL;
		long	tLength = 0;
	
		pUp->strucLength = bufSize;
	
        /* Set the topic length field */
        pTmpPtr = (char*)pUp + sizeof(long);
        tLength = strlen(topic);
        memcpy( pTmpPtr, &tLength, sizeof(long) );

        /* Set the topic field */
        pTmpPtr += sizeof(long);
        memcpy( pTmpPtr, topic, strlen(topic) );

		/* Unsubscribe */
		rc = MQIsdp_unsubscribe( handle, &hMsg, pUp );
		free( pUp );

	} else {
		rc = MQISDP_OUT_OF_MEMORY;
	}

	RETVAL = get_status_string(rc);
 	
  OUTPUT:
	RETVAL



##
## Receive a publication
##
HV*
xs_receivePub( self )
	HV*		self

  PREINIT:
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
  	HV			*hash = newHV();
  	SV			*sv = NULL;
  	
	int			rc = 0;
	int			stillWaiting = 1;
	long		timeToWait = 10000;	// 10 Seconds
	
	long		dataLength = 0;
	long		topicLength = 0;
	long		pubOptions = 0;
	long		bufferSz = 1024;
	char		*pBuffer = NULL;


  CODE:
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );  
  	
  	
  	/* Allocate some memory to store messgae */
  	pBuffer = (char*)malloc( bufferSz );
  	
  	
  	/* Wait for a message */
  	while( stillWaiting )
	{
  		
  		rc = MQIsdp_receivePub( handle, timeToWait, &pubOptions, &topicLength, &dataLength, bufferSz-1, pBuffer );

		/* Not sure why this is required */
		dataLength -= topicLength;

  		switch( rc ) {
  			case MQISDP_DATA_TRUNCATED:
  				bufferSz = dataLength+topicLength+1;
  				if (pBuffer == NULL) {
  					pBuffer = (char*)malloc( bufferSz );
  				} else {
  					pBuffer = (char*)realloc( pBuffer, bufferSz );
   				}
  			break;
  			
  			case MQISDP_NO_PUBS_AVAILABLE:
  				/* Do nothing */
  			break;
  			
  			case MQISDP_PUBS_AVAILABLE:
  			case MQISDP_OK:
  			
   				/* Store the options */
   				sv = newRV_noinc((SV*)options_to_av( pubOptions ));
  				if (hv_store(hash, "options", 7, sv, 0) == NULL) {
					croak("xs_receivePub: options not stored");
				}

 				/* Store the topic length */
				sv=newSViv(topicLength);
				if (hv_store(hash, "topic_length", 12, sv, 0) == NULL) {
					croak("xs_receivePub: topic_length not stored");
				}

				/* Store the topic */
				sv=newSVpv(pBuffer, topicLength);
				if (hv_store(hash, "topic", 5, sv, 0) == NULL) {
					croak("xs_receivePub: topic not stored");
				}
	
				/* Store the data length */
				sv=newSViv(dataLength);
				if (hv_store(hash, "data_length", 8, sv, 0) == NULL) {
					croak("xs_receivePub: data_length not stored");
				}
			
				/* Store the data */
				sv=newSVpv(pBuffer+topicLength, dataLength);
				if (hv_store(hash, "data", 4, sv, 0) == NULL) {
					croak("xs_receivePub: data not stored");
				}

				/* FALLTHROUGH */

  			default:
  				
				/* Store the status */
				sv=newSVpv(get_status_string(rc), 0);
				if (hv_store(hash, "status", 6, sv, 0) == NULL) {
					croak("xs_receivePub: status not stored");
				}
 				
  				stillWaiting = 0;
  			break;
  		}
  		
   	}
  	

	/* Free the message buffer */
	if (pBuffer) free( pBuffer );

	RETVAL = hash;
 	
  OUTPUT:
	RETVAL



##
## Publish a message
## Note that you cannot send or queue more than MSP_DEFAULT_MAX_OUTQ_SZ
## bytes (defined as 32768 in mspsh.h), or you will get Q_FULL response
##
void
xs_publish( self, data, topic, qos, retain )
	HV*		self
	char*		data
	char*		topic
	int		qos
	int		retain
	
  PREINIT:
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
  	MQISDPMH	hMsg = MQISDP_INV_MSG_HANDLE;
  	PUB_PARMS	Pp;
	int		rc = 0;
	
  PPCODE:
	
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );

	Pp.strucLength = sizeof(PUB_PARMS);
	
	/* Set the options field */
	Pp.options = MQISDP_NONE;
	switch ( qos ) {
			case 0: Pp.options |= MQISDP_QOS_0; break;
			case 1: Pp.options |= MQISDP_QOS_1; break;
			case 2: Pp.options |= MQISDP_QOS_2; break;
	}
	if ( retain ) {
			Pp.options |= MQISDP_RETAIN;
	}

	/* Set the topic length field */
	Pp.topicLength = strlen(topic);

	/* Set the topic field */
	Pp.topic = topic;

	/* Set the data length field */
	Pp.dataLength = strlen(data);

	/* Set the data field */
	Pp.data = data;

	/* Publish */
	rc = MQIsdp_publish( handle, &hMsg, &Pp );

	/* Return two values as an array */	
	XPUSHs( sv_2mortal( newSVpv( get_status_string(rc), 0 ) ) );
	XPUSHs( sv_2mortal( newSVnv( hMsg ) ) );

##
## Check status of a message published at QOS 1/2
## Returns one of:
## DELIVERED, IN_PROGRESS, RETRYING, MSG_HANDLE_ERROR, CONN_HANDLE_ERROR
##
const char*
xs_getMsgStatus( self, hMsg )
	HV*		self
  	long		hMsg
	
  PREINIT:
  	MQISDPCH	handle = MQISDP_INV_CONN_HANDLE;
	int		rc = MQISDP_INV_MSG_HANDLE;
	
  CODE:
	
  	/* get the connection handle */
  	handle = get_handle_from_hv( self );
	
	rc = MQIsdp_getMsgStatus( handle, hMsg );
	
	RETVAL = get_status_string(rc);
 	
  OUTPUT:
	RETVAL