The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
/* Filename: Spread.xs
 * Author:   Theo Schlossnagle <jesus@cnds.jhu.edu>
 * Created:  12th October 1999
 *
 * Copyright (c) 1999-2006,2008 Theo Schlossnagle. All rights reserved.
 *   This program is free software; you can redistribute it and/or
 *   modify it under the same terms as Perl itself.
 *
 */

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include "sp.h"

#ifndef MIN
#define MIN(a,b) (((a)<(b))?(a):(b))
#endif

#ifndef PERL_VERSION
#include "patchlevel.h"
#define PERL_REVISION   5
#define PERL_VERSION    PATCHLEVEL
#define PERL_SUBVERSION SUBVERSION
#endif

#if PERL_REVISION == 5 && (PERL_VERSION < 4 || (PERL_VERSION == 4 && PERL_SUBVERSION <= 75 ))

#    define PL_sv_undef         sv_undef
#    define PL_na               na
#    define PL_curcop           curcop
#    define PL_compiling        compiling

#endif


#define SPERRNO "Spread::sperrno"
#define MAX_ERRMSG     4
#define SELECT_FAILED  4
#define SELECT_TIMEOUT 3
#define ARGS_INSUFF 2
static char *my_e_errmsg[] = {
 "Select Failed",       /* SELECT_FAILED        4 */
 "Select Timed Out",	/* SELECT_TIMEOUT		3 */
 "Insufficient Arguments", /* ARGS_INSUFF		2 */
 "Accept Session",	/* ACCEPT_SESSION		1 */
 ""		,	/*				0 */
 "Illegal Spread",	/* ILLEGAL_SPREAD		-1 */
 "Could Not Connect",	/* COULD_NOT_CONNECT		-2 */
 "Reject: Quota",	/* REJECT_QUOTA			-3 */
 "Reject: No Name",	/* REJECT_NO_NAME		-4 */
 "Reject: Illegal Name",/* REJECT_ILLEGAL_NAME		-5 */
 "Reject: Not Unique",	/* REJECT_NOT_UNIQUE		-6 */
 "Reject: Version",	/* REJECT_VERSION		-7 */
 "Connection Closed",	/* CONNECTION_CLOSED		-8 */
 "Reject: Auth",	/* REJECT_AUTH			-9 */
 ""		,	/*				-10 */
 "Illegal Session",	/* ILLEGAL_SESSION		-11 */
 "Illegal Service",	/* ILLEGAL_SERVICE		-12 */
 "Illegal Message",	/* ILLEGAL_MESSAGE		-13 */
 "Illegal Group",	/* ILLEGAL_GROUP		-14 */
 "Buffer Too Short",	/* BUFFER_TOO_SHORT		-15 */
#ifdef GROUPS_TOO_SHORT
 "Groups Too Short",	/* GROUPS_TOO_SHORT		-16 */
#endif
#ifdef MESSAGE_TOO_LONG
 "Message Too Long",	/* MESSAGE_TOO_LONG		-17 */
#else
#error You must install spread client libraries to build perl Spread.
#endif
 ""};
static char *connect_params[] = {
	"spread_name",
	"private_name",
	"priority",
	"group_membership",
	""};
static int nconnect_params = 4;

SV *sv_NULL ;

static int
not_here(s)
char *s;
{
    croak("%s not implemented on this architecture", s);
    return -1;
}

static double
constant(name, arg)
char *name;
int arg;
{
    errno = 0;
    switch (*name) {
    case 'A':
        if (strEQ(name, "ACCEPT_SESSION"))
#ifdef ACCEPT_SESSION
            return ACCEPT_SESSION;
#else
            goto not_there;
#endif
        if (strEQ(name, "AGREED_MESS"))
#ifdef AGREED_MESS
            return AGREED_MESS;
#else
            goto not_there;
#endif
        break;
    case 'B':
        if (strEQ(name, "BUFFER_TOO_SHORT"))
#ifdef BUFFER_TOO_SHORT
            return BUFFER_TOO_SHORT;
#else
            goto not_there;
#endif
        break;
    case 'C':
        if (strEQ(name, "CAUSAL_MESS"))
#ifdef CAUSAL_MESS
            return CAUSAL_MESS;
#else
            goto not_there;
#endif
        if (strEQ(name, "CAUSED_BY_DISCONNECT"))
#ifdef CAUSED_BY_DISCONNECT
            return CAUSED_BY_DISCONNECT;
#else
            goto not_there;
#endif
        if (strEQ(name, "CAUSED_BY_JOIN"))
#ifdef CAUSED_BY_JOIN
            return CAUSED_BY_JOIN;
#else
            goto not_there;
#endif
        if (strEQ(name, "CAUSED_BY_LEAVE"))
#ifdef CAUSED_BY_LEAVE
            return CAUSED_BY_LEAVE;
#else
            goto not_there;
#endif
        if (strEQ(name, "CAUSED_BY_NETWORK"))
#ifdef CAUSED_BY_NETWORK
            return CAUSED_BY_NETWORK;
#else
            goto not_there;
#endif
        if (strEQ(name, "CONNECTION_CLOSED"))
#ifdef CONNECTION_CLOSED
            return CONNECTION_CLOSED;
#else
            goto not_there;
#endif
        if (strEQ(name, "COULD_NOT_CONNECT"))
#ifdef COULD_NOT_CONNECT
            return COULD_NOT_CONNECT;
#else
            goto not_there;
#endif
        break;
    case 'D':
        if (strEQ(name, "DROP_RECV"))
#ifdef DROP_RECV
            return DROP_RECV;
#else
            goto not_there;
#endif
        break;
    case 'E':
        break;
    case 'F':
        if (strEQ(name, "FIFO_MESS"))
#ifdef FIFO_MESS
            return FIFO_MESS;
#else
            goto not_there;
#endif
        break;
    case 'G':
        if (strEQ(name, "GROUPS_TOO_SHORT"))
#ifdef GROUPS_TOO_SHORT
            return GROUPS_TOO_SHORT;
#else
            goto not_there;
#endif
        break;
    case 'H':
        if (strEQ(name, "HIGH_PRIORITY"))
#ifdef HIGH_PRIORITY
            return HIGH_PRIORITY;
#else
            goto not_there;
#endif
        break;
    case 'I':
        if (strEQ(name, "ILLEGAL_GROUP"))
#ifdef ILLEGAL_GROUP
            return ILLEGAL_GROUP;
#else
            goto not_there;
#endif
        if (strEQ(name, "ILLEGAL_MESSAGE"))
#ifdef ILLEGAL_MESSAGE
            return ILLEGAL_MESSAGE;
#else
            goto not_there;
#endif
        if (strEQ(name, "ILLEGAL_SERVICE"))
#ifdef ILLEGAL_SERVICE
            return ILLEGAL_SERVICE;
#else
            goto not_there;
#endif
        if (strEQ(name, "ILLEGAL_SESSION"))
#ifdef ILLEGAL_SESSION
            return ILLEGAL_SESSION;
#else
            goto not_there;
#endif
        if (strEQ(name, "ILLEGAL_SPREAD"))
#ifdef ILLEGAL_SPREAD
            return ILLEGAL_SPREAD;
#else
            goto not_there;
#endif
        break;
    case 'J':
        break;
    case 'K':
        break;
    case 'L':
        if (strEQ(name, "LOW_PRIORITY"))
#ifdef LOW_PRIORITY
            return LOW_PRIORITY;
#else
            goto not_there;
#endif
        break;
    case 'M':
        if (strEQ(name, "MAX_SCATTER_ELEMENTS"))
#ifdef MAX_SCATTER_ELEMENTS
            return MAX_SCATTER_ELEMENTS;
#else
            goto not_there;
#endif
        if (strEQ(name, "MEDIUM_PRIORITY"))
#ifdef MEDIUM_PRIORITY
            return MEDIUM_PRIORITY;
#else
            goto not_there;
#endif
        if (strEQ(name, "MEMBERSHIP_MESS"))
#ifdef MEMBERSHIP_MESS
            return MEMBERSHIP_MESS;
#else
            goto not_there;
#endif
        if (strEQ(name, "MESSAGE_TOO_LONG"))
#ifdef MESSAGE_TOO_LONG
            return MESSAGE_TOO_LONG;
#else
            goto not_there;
#endif
        break;
    case 'N':
        break;
    case 'O':
        break;
    case 'P':
        break;
    case 'Q':
        break;
    case 'R':
        if (strEQ(name, "REGULAR_MESS"))
#ifdef REGULAR_MESS
            return REGULAR_MESS;
#else
            goto not_there;
#endif
        if (strEQ(name, "REG_MEMB_MESS"))
#ifdef REG_MEMB_MESS
            return REG_MEMB_MESS;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_AUTH"))
#ifdef REJECT_AUTH
            return REJECT_AUTH;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_ILLEGAL_NAME"))
#ifdef REJECT_ILLEGAL_NAME
            return REJECT_ILLEGAL_NAME;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_MESS"))
#ifdef REJECT_MESS
            return REJECT_MESS;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_NOT_UNIQUE"))
#ifdef REJECT_NOT_UNIQUE
            return REJECT_NOT_UNIQUE;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_NO_NAME"))
#ifdef REJECT_NO_NAME
            return REJECT_NO_NAME;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_QUOTA"))
#ifdef REJECT_QUOTA
            return REJECT_QUOTA;
#else
            goto not_there;
#endif
        if (strEQ(name, "REJECT_VERSION"))
#ifdef REJECT_VERSION
            return REJECT_VERSION;
#else
            goto not_there;
#endif
        if (strEQ(name, "RELIABLE_MESS"))
#ifdef RELIABLE_MESS
            return RELIABLE_MESS;
#else
            goto not_there;
#endif
        break;
    case 'S':
        if (strEQ(name, "SAFE_MESS"))
#ifdef SAFE_MESS
            return SAFE_MESS;
#else
            goto not_there;
#endif
        if (strEQ(name, "SELF_DISCARD"))
#ifdef SELF_DISCARD
            return SELF_DISCARD;
#else
            goto not_there;
#endif
        break;
    case 'T':
        if (strEQ(name, "TRANSITION_MESS"))
#ifdef TRANSITION_MESS
            return TRANSITION_MESS;
#else
            goto not_there;
#endif
        break;
    case 'U':
        if (strEQ(name, "UNRELIABLE_MESS"))
#ifdef UNRELIABLE_MESS
            return UNRELIABLE_MESS;
#else
            goto not_there;
#endif
        break;
    case 'V':
        break;
    case 'W':
        break;
    case 'X':
        break;
    case 'Y':
        break;
    case 'Z':
        break;
    case 'a':
        break;
    case 'b':
        break;
    case 'c':
        break;
    case 'd':
        break;
    case 'e':
        break;
    case 'f':
        break;
    case 'g':
        break;
    case 'h':
        break;
    case 'i':
        break;
    case 'j':
        break;
    case 'k':
        break;
    case 'l':
        break;
    case 'm':
        break;
    case 'n':
        break;
    case 'o':
        break;
    case 'p':
        break;
    case 'q':
        break;
    case 'r':
        break;
    case 's':
        break;
    case 't':
        break;
    case 'u':
        break;
    case 'v':
        break;
    case 'w':
        break;
    case 'x':
        break;
    case 'y':
        break;
    case 'z':
        break;
    }
    errno = EINVAL;
    return 0;

not_there:
    errno = ENOENT;
    return 0;
}

static void
SetSpErrorNo(error_no)
int error_no ;
{
	char * errstr ;
	SV * sperror_sv = perl_get_sv(SPERRNO, FALSE);

	errstr = (char *) my_e_errmsg[MAX_ERRMSG - error_no];
	if (SvIV(sperror_sv) != error_no) {
	  sv_setiv(sperror_sv, error_no) ;
	  sv_setpv(sperror_sv, errstr) ;
	  SvIOK_on(sperror_sv) ;
	}
}
static char *
SPversionstr() {
  static char version_string[60];
  int major, minor, patch;
  if(SP_version(&major, &minor, &patch) > 0) {
    sprintf(version_string, "%d.%d.%d", major, minor, patch);
  } else {
    sprintf(version_string, "SP_version failed, could not retrieve version.");
  }
  return version_string;
}

MODULE = Spread	PACKAGE = Spread	PREFIX = GC_

REQUIRE:	1.9505
PROTOTYPES:	DISABLE

BOOT:
	/* Check version of Spread == 3.11 */
	{
        int major, minor, patch;
	if(SP_version(&major, &minor, &patch) <= 0 ||
	   major<3 || (major==3 && minor<15) ||
	   (major==3 && minor==15 && patch<1))
	  croak("%s", SPversionstr()) ; 

	{
	  SV * sperror_sv = perl_get_sv(SPERRNO, GV_ADDMULTI) ;
	  sv_setiv(sperror_sv, 0) ;
	  sv_setpv(sperror_sv, "") ;
	  SvIOK_on(sperror_sv) ;
	}
	}
	sv_NULL = newSVpv("", 0) ;

double
constant(name,arg)
        char *          name
        int             arg

#define GC_version() SPversionstr()
char *
GC_version()

SV *
GC_disconnect(svmbox)
	SV * svmbox
	CODE:
	{
	  int mbox = SvIV(svmbox);
	  if((mbox = SP_disconnect(mbox))==0)
	    RETVAL = &PL_sv_yes;
	  else {
	    SetSpErrorNo(mbox);
	    RETVAL = &PL_sv_no;
	  }
	}
	OUTPUT:
	  RETVAL

AV *
GC_connect_i(rv)
	SV * rv
	PREINIT:
	  SV *MAILBOX, *PRIVATE_GROUP;
	  SV **afetch;
	  int i, error, pr, gm;
	  mailbox mbox = -1;
	  char *sn, *pn, pg[MAX_GROUP_NAME];
	  HV *hv;
	PPCODE:
	  MAILBOX = PRIVATE_GROUP = &PL_sv_undef;
	  if(!SvROK(rv) || SvTYPE(hv = (HV *)SvRV(rv))!=SVt_PVHV)
	    croak("not a HASH reference");
	  for(i=0;i<nconnect_params;i++)
	    if(hv_exists(hv, connect_params[i],
		strlen(connect_params[i])) == FALSE) {
	      SetSpErrorNo(ARGS_INSUFF);
	      goto ending;
            }
	  i=0;
	  afetch = hv_fetch(hv, connect_params[i],
		strlen(connect_params[i]), FALSE); i++;
	  sn = SvPV(*afetch, PL_na);
	  afetch = hv_fetch(hv, connect_params[i],
		strlen(connect_params[i]), FALSE); i++;
	  pn = SvPV(*afetch, PL_na);
	  afetch = hv_fetch(hv, connect_params[i],
		strlen(connect_params[i]), FALSE); i++;
	  pr = SvIV(*afetch);
	  afetch = hv_fetch(hv, connect_params[i],
		strlen(connect_params[i]), FALSE); i++;
	  gm = SvIV(*afetch);
	  if((error = SP_connect(sn,pn,pr,gm,&mbox,pg))>0 && mbox>0) {
	    MAILBOX = sv_2mortal(newSViv(mbox));
	    PRIVATE_GROUP = sv_2mortal(newSVpv(pg, 0));
          } else {
	    SetSpErrorNo(error);
	  }
	ending:
          EXTEND(SP, 2);
          PUSHs(MAILBOX);
          PUSHs(PRIVATE_GROUP);

SV *
GC_join(svmbox, group_name)
	SV * svmbox
	char *group_name
	CODE:
	{
	  int mbox = SvIV(svmbox);
	  if((mbox = SP_join(mbox, group_name))==0) {
	    RETVAL = &PL_sv_yes;
	  } else {
	    SetSpErrorNo(mbox);
	    RETVAL = &PL_sv_no;
	  }
	}
	OUTPUT:
	  RETVAL

SV *
GC_leave(svmbox, group_name)
	SV * svmbox
	char *group_name
	CODE:
	{
	  int mbox = SvIV(svmbox);
	  if((mbox = SP_leave(mbox, group_name))==0) {
	    RETVAL = &PL_sv_yes;
	  } else {
	    SetSpErrorNo(mbox);
	    RETVAL = &PL_sv_no;
	  }
	}
	OUTPUT:
	  RETVAL

SV *
GC_multicast(svmbox, stype, svgroups, mtype, mess)
	SV * svmbox
	service stype
	SV * svgroups
	int16 mtype
	SV * mess
	INIT:
	  static char *groupnames=NULL;
	  static int gsize=-1;
	CODE:
	{
	  int mbox = SvIV(svmbox);
	  int i, ret, ngroups=0;
          size_t mlength;
	  char *groupname;
	  char *message;
	/* It is OK to use NULL.. We only see this, it isn't returned */
	  AV * groups = (AV *)NULL;
	  SV * group = (SV *)NULL;
	  RETVAL = &PL_sv_undef;
	  if(SvROK(svgroups)) {
	    if(SvTYPE(groups = (AV *)SvRV(svgroups))==SVt_PVAV) {
	      ngroups = av_len(groups)+1;
	      if(gsize<ngroups) {
	        if(gsize<0) gsize=1;
		while(gsize<ngroups) gsize<<=1;
		if(!groupnames)
	          New(0, groupnames,gsize*MAX_GROUP_NAME,char);
		else
		  Renew(groupnames,gsize*MAX_GROUP_NAME,char);
	      }
	      for(i=0;i<ngroups;i++) {
		char *string;
		size_t slength;
		SV **afetch = av_fetch(groups, i, FALSE);
		string = SvPV(*afetch, slength);
		strncpy(&groupnames[i*MAX_GROUP_NAME],
			string,
			MAX_GROUP_NAME);
	      }
	    } else if(SvTYPE(group = SvRV(svgroups))==SVt_PV) {
	      groupname = SvPV(group, PL_na);
	    } else {
	      croak("not a SCALAR or ARRAY reference.");
	    }
	  } else if(groupname=SvPV(svgroups, PL_na)) {
	    group = svgroups;
	  } else {
	    SetSpErrorNo(ARGS_INSUFF);
	    goto multi_ending;
	  }

	  message = SvPV(mess, mlength);
	  if(group != NULL) {
	    /* groupname is already set and
	       we are multicasting to a single group */
	    ret = SP_multicast(mbox, stype, groupname,
				mtype, mlength, message);
	  } else if(groups != NULL) {
	    /* groupnames is already set and
	       we are multicasting to a multigroup */
	    ret = SP_multigroup_multicast(mbox, stype, ngroups,
				groupnames,
				mtype, mlength, message);
	  } else {
	    /* Something went horrbily wrong */
	    croak("not SCALAR, SCALAR ref or ARRAY ref.");
	  }
	  if(ret<0)
	    SetSpErrorNo(ret);
	  else
	    RETVAL = newSViv(ret);
	}
	multi_ending:
	OUTPUT:
	  RETVAL

AV *
GC_receive(svmbox, svtimeout=&PL_sv_undef)
	SV * svmbox
	SV * svtimeout
	PREINIT:
	  static int oldgsize=0, newgsize=(1<<6);
	  static int oldmsize=0, newmsize=(1<<15); /* 65k */
	  int i, mbox, endmis, ret, ngrps, msize;
	  int16 mtype;
	  service stype = 0;
	  struct timeval towait;
	  static char *groups=NULL;
	  static char *mess=NULL;
	  char sender[MAX_GROUP_NAME];
	  SV *STYPE, *MTYPE, *MESSAGE, *SENDER, *ENDMIS, *ERROR;
	  AV *GROUPS=(AV *)&PL_sv_undef;
	PPCODE:
	  if(svmbox == &PL_sv_undef) {
	    STYPE=SENDER=MTYPE=ENDMIS=MESSAGE=&PL_sv_undef;
	    SetSpErrorNo(ILLEGAL_SESSION);
	    goto rec_ending;
	  }
	  mbox = SvIV(svmbox);
	  ERROR=&PL_sv_undef;
	  if(svtimeout != &PL_sv_undef) {
	    double timeout;
	    fd_set readfs;
	    towait.tv_sec = 0L;
	    towait.tv_usec = 0L;
	    timeout = SvNV(svtimeout);
	    towait.tv_sec = (unsigned long)timeout;
	    towait.tv_usec =
	      (unsigned long)(1000000.0*(timeout-(double)towait.tv_sec));
	    FD_ZERO(&readfs); FD_SET(mbox, &readfs);
	    if((ret = select(mbox+1, &readfs, NULL, &readfs, &towait))!=1) {
	      STYPE=SENDER=MTYPE=ENDMIS=MESSAGE=&PL_sv_undef;
	      SetSpErrorNo( ret == 0 ? SELECT_TIMEOUT : SELECT_FAILED );
	      goto rec_ending;
	    }
	  }
       try_again:
	  /* realloc or alloc buffer if necessary */
	  if(oldgsize != newgsize) {
	    if(groups)
	      Renew(groups, newgsize*MAX_GROUP_NAME, char);
	    else
	      New(0, groups, newgsize*MAX_GROUP_NAME, char);
	    oldgsize=newgsize;
	  }
	  if(oldmsize != newmsize) {
	    if(mess)
	      Renew(mess, newmsize, char);
	    else
	      New(0, mess, newmsize, char);
	    oldmsize=newmsize;
	  }
	  if((ret=SP_receive(mbox, &stype, sender, newgsize, &ngrps, groups,
		&mtype, &endmis, newmsize, mess))<0) {
		if(ret==BUFFER_TOO_SHORT) {
		  /* Lets double it, so this won't happen again */
		  newmsize=-endmis;
		  ERROR = newSViv(BUFFER_TOO_SHORT);
		  msize = oldmsize;
		  goto try_again;
#ifdef GROUPS_TOO_SHORT
		} else if (ret==GROUPS_TOO_SHORT) {
		  newgsize=-ngrps;
		  ERROR = newSViv(GROUPS_TOO_SHORT);
		  ngrps = oldgsize;
		  goto try_again;
#endif
		} else {
		  STYPE=SENDER=MTYPE=ENDMIS=MESSAGE=&PL_sv_undef;
		  SetSpErrorNo(ret);
		}
	  } else {
	    msize=ret;
	still_okay:
	    /* We recieved the message */
	    if(newgsize+ngrps < 0)
		newgsize*=2;
	    if(ngrps<0) ngrps=oldgsize;	
   	    if(ngrps>0) {
	      GROUPS = (AV *)sv_2mortal((SV *)newAV());
	      for(i=0;i<ngrps;i++)
		av_push(GROUPS, newSVpv(&groups[i*MAX_GROUP_NAME],
		                     MIN(strlen(&groups[i*MAX_GROUP_NAME]),
			                 MAX_GROUP_NAME)));
	    }
	    SENDER=sv_2mortal(newSVpv(sender, 0));
	    STYPE=sv_2mortal(newSViv(stype));
	    MTYPE=sv_2mortal(newSViv(mtype));
	    ENDMIS=(endmis)?(&PL_sv_yes):(&PL_sv_no);
	    MESSAGE=sv_2mortal(newSVpv(mess, msize));
	  }
	rec_ending:
          EXTEND(SP, 6);
          PUSHs(STYPE);
	  PUSHs(SENDER);
	  PUSHs(sv_2mortal(newRV((SV *)GROUPS)));
	  PUSHs(MTYPE);
	  PUSHs(ENDMIS);
          PUSHs(MESSAGE);

SV *
GC_poll(svmbox)
	SV * svmbox
	PREINIT:
	  int mbox = SvIV(svmbox);
	CODE:
	  mbox = SP_poll(mbox);
	  if(mbox<0) {
	    SetSpErrorNo(mbox);
	    RETVAL = &PL_sv_undef;
	  } else {
	    RETVAL = newSViv(mbox);
	  }
	OUTPUT:
	  RETVAL