The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__)
# define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6
#else
# define ENABLE_IPV6 0
#endif

#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include <pthread.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <time.h>
#include <poll.h>
#include <unistd.h>
#include <inttypes.h>
#include <fcntl.h>
#include <errno.h>
#include <limits.h>

#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>

#include <netinet/in.h>
#include <arpa/inet.h>

#ifdef __linux
# include <linux/icmp.h>
#endif
#if ENABLE_IPV6 && !defined (__CYGWIN__)
# include <netinet/icmp6.h>
#endif

#define ICMP4_ECHO       8
#define ICMP4_ECHO_REPLY 0
#define ICMP6_ECHO       128
#define ICMP6_ECHO_REPLY 129

#define DRAIN_INTERVAL 1e-6 // how long to wait when sendto returns ENOBUFS, in seconds
#define MIN_INTERVAL   1e-6 // minimum packet send interval, in seconds

#define HDR_SIZE_IP4  20
#define HDR_SIZE_IP6  48

static int thr_res[2]; // worker thread finished status
static int icmp4_fd = -1;
static int icmp6_fd = -1;

/*****************************************************************************/

typedef double tstamp;

static tstamp
NOW (void)
{
  struct timeval tv;

  gettimeofday (&tv, 0);

  return tv.tv_sec + tv.tv_usec * 1e-6;
}

static void
ssleep (tstamp wait)
{
#if defined (__SVR4) && defined (__sun)
  struct timeval tv;

  tv.tv_sec  = wait;
  tv.tv_usec = (wait - tv.tv_sec) * 1e6;

  select (0, 0, 0, 0, &tv);
#elif defined(_WIN32)
  Sleep ((unsigned long)(delay * 1e3));
#else
  struct timespec ts;

  ts.tv_sec  = wait;
  ts.tv_nsec = (wait - ts.tv_sec) * 1e9;

  nanosleep (&ts, 0);
#endif
}

/*****************************************************************************/

typedef struct
{
  uint8_t version_ihl;
  uint8_t tos;
  uint16_t tot_len;

  uint16_t id;
  uint16_t flags;

  uint8_t ttl;
  uint8_t protocol;
  uint16_t cksum;

  uint32_t src;
  uint32_t dst;
} IP4HDR;

/*****************************************************************************/

typedef uint8_t addr_tt[16];

typedef struct
{
  tstamp next;
  tstamp interval;
  int addrlen;

  addr_tt lo, hi; /* only if !addrcnt */

  int addrcnt;
  /* addrcnt addresses follow */
} RANGE;

typedef struct
{
  RANGE **ranges;
  int rangecnt, rangemax;

  tstamp next;
  tstamp interval;

  tstamp maxrtt;

  uint16_t magic1;
  uint16_t magic2;
  uint16_t magic3;

  int id;

  AV *recvq; /* receive queue */
  int nextrecv;
  SV *recvcb;

  pthread_t thrid;
  int running;
} PINGER;

static PINGER **pingers;
static int *pingerfree; /* freelist next */
static int pingercnt;
static int pingermax;
static int firstfree = -1;
static int firstrecv = -1;

/*****************************************************************************/

typedef struct
{
  uint8_t type, code;
  uint16_t cksum;

  uint16_t id, seq;

  uint16_t pinger;
  uint16_t magic;

  uint32_t stamp_hi;
  uint32_t stamp_lo;
} PKT;

static int
pkt_is_valid_for (PKT *pkt, PINGER *pinger)
{
  return pkt->id    == pinger->magic1
      && pkt->seq   == pinger->magic2
      && pkt->magic == pinger->magic3;
}

static void
ts_to_pkt (PKT *pkt, tstamp ts)
{
  /* move 12 bits of seconds into the 32 bit fractional part */
  /* leaving 20 bits subsecond resolution and 44 bits of integers */
  /* (of which 32 are typically usable) */
  ts *= 1. / 4096.;

  pkt->stamp_hi = ts;
  pkt->stamp_lo = (ts - pkt->stamp_hi) * 4294967296.;
}

static tstamp
pkt_to_ts (PKT *pkt)
{
  return pkt->stamp_hi *  4096.
       + pkt->stamp_lo * (4096. / 4294967296.);
}

static void
pkt_cksum (PKT *pkt)
{
  uint_fast32_t sum = 0;
  uint32_t *wp = (uint32_t *)pkt;
  int len = sizeof (*pkt) / 4;

  do
    {
      uint_fast32_t w = *(volatile uint32_t *)wp++;
      sum += (w & 0xffff) + (w >> 16);
    }
  while (--len);

  sum = (sum >> 16) + (sum & 0xffff);   /* add high 16 to low 16 */
  sum += sum >> 16;                     /* add carry */

  pkt->cksum = ~sum;
}

/*****************************************************************************/

static void
range_free (RANGE *self)
{
  free (self);
}

/* like sendto, but retries on failure */
static void
xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen)
{
  tstamp wait = DRAIN_INTERVAL / 2.;

  while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS)
    ssleep (wait *= 2.);
}

// ping current address, return true and increment if more to ping
static int
range_send_ping (RANGE *self, PKT *pkt)
{
  // send ping
  uint8_t *addr;
  int addrlen;

  if (self->addrcnt)
    addr = (self->addrcnt - 1) * self->addrlen + (uint8_t *)(self + 1);
  else
    addr = sizeof (addr_tt) - self->addrlen + self->lo;

  addrlen = self->addrlen;

  /* convert ipv4 mapped addresses - this only works for host lists */
  /* this tries to match 0000:0000:0000:0000:0000:ffff:a.b.c.d */
  /* efficiently but also with few insns */
  if (addrlen == 16 && !addr [0] && icmp4_fd >= 0
      && !(              addr [ 1]
           | addr [ 2] | addr [ 3]
           | addr [ 4] | addr [ 5]
           | addr [ 6] | addr [ 7]
           | addr [ 8] | addr [ 9]
           | (255-addr [10]) | (255-addr [11])))
    {
      addr += 12;
      addrlen -= 12;
    }

  pkt->cksum = 0;

  if (addrlen == 4)
    {
      struct sockaddr_in sa;

      pkt->type = ICMP4_ECHO;
      pkt_cksum (pkt);

      sa.sin_family = AF_INET;
      sa.sin_port   = 0;

      memcpy (&sa.sin_addr, addr, sizeof (sa.sin_addr));

      xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
    }
  else
    {
#if ENABLE_IPV6
      struct sockaddr_in6 sa;

      pkt->type = ICMP6_ECHO;

      sa.sin6_family   = AF_INET6;
      sa.sin6_port     = 0;
      sa.sin6_flowinfo = 0;
      sa.sin6_scope_id = 0;

      memcpy (&sa.sin6_addr, addr, sizeof (sa.sin6_addr));

      xsendto (icmp6_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
#endif
    }

  // see if we have any more addresses
  if (self->addrcnt)
    {
      if (!--self->addrcnt)
        return 0;
    }
  else
    {
      if (!memcmp (&self->lo, &self->hi, sizeof (addr_tt)))
        return 0;

      // increment self->lo
      {
        int len = sizeof (addr_tt) - 1;

        while (!++self->lo [len])
          --len;
      }
    }

  return 1;
}

/*****************************************************************************/

static void
downheap (PINGER *self)
{
  RANGE *elem = self->ranges [0]; /* always exists */
  int Nm1 = self->rangecnt - 1;
  int j;
  int k;

  for (k = 0; ; )
    {
      int j = k * 2 + 1;

      if (j > Nm1)
        break;

      if (j < Nm1
          && self->ranges [j]->next > self->ranges [j + 1]->next)
        ++j;

      if (self->ranges [j]->next >= elem->next)
        break;

      self->ranges [k] = self->ranges [j];

      k = j;
    }

  self->ranges [k] = elem;
}

static void
upheap (PINGER *self, int k)
{
  RANGE *elem = self->ranges [k];

  while (k)
    {
      int j = (k - 1) >> 1;

      if (self->ranges [j]->next <= elem->next)
        break;

      self->ranges [k] = self->ranges [j];

      k = j;
    }

  self->ranges [k] = elem;
}

static void *
ping_proc (void *self_)
{
  PINGER *self = (PINGER *)self_;
  PKT pkt;

  memset (&pkt, 0, sizeof (pkt));

  tstamp now = NOW ();

  pkt.code   = 0;
  pkt.id     = self->magic1;
  pkt.seq    = self->magic2;
  pkt.magic  = self->magic3;
  pkt.pinger = self->id;

  if (self->next < now)
    self->next = now;

  while (self->rangecnt)
    {
      RANGE *range = self->ranges [0];

      // ranges [0] is always the next range to ping
      tstamp wait = range->next - now;

      // compare with the global frequency limit
      {
        tstamp diff = self->next - now;

        if (wait < diff)
          wait = diff; // global rate limit overrides
        else
          self->next = range->next; // fast forward
      }

      if (wait > 0.)
        ssleep (wait);

      now = NOW ();

      ts_to_pkt (&pkt, now);

      if (!range_send_ping (range, &pkt))
        {
          self->ranges [0] = self->ranges [--self->rangecnt];
          range_free (range);
        }
      else
        range->next = self->next + range->interval;

      downheap (self);

      self->next += self->interval;
      now = NOW ();
    }

  ssleep (self->maxrtt);

  {
    uint16_t id = self->id;

    write (thr_res [1], &id, sizeof (id));
  }

  return 0;
}

/*****************************************************************************/

/* NetBSD, Solaris... */
#ifndef PTHREAD_STACK_MIN
# define PTHREAD_STACK_MIN 0
#endif

static void
pinger_start (PINGER *self)
{
  sigset_t fullsigset, oldsigset;
  pthread_attr_t attr;

  if (self->running)
    return;

  sigfillset (&fullsigset);

  pthread_attr_init (&attr);
  pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 2048 ? sizeof (long) * 2048 : PTHREAD_STACK_MIN);

  pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);

  if (pthread_create (&self->thrid, &attr, ping_proc, (void *)self))
    croak ("AnyEvent::FastPing: unable to create pinger thread");

  pthread_sigmask (SIG_SETMASK, &oldsigset, 0);

  self->running = 1;
}

static void
pinger_stop (PINGER *self)
{
  if (!self->running)
    return;

  self->running = 0;
  pthread_cancel (self->thrid);
  pthread_join (self->thrid, 0);
}

static void
pinger_init (PINGER *self)
{
  memset (self, 0, sizeof (PINGER));

  if (firstfree >= 0)
    {
      self->id = firstfree;
      firstfree = pingerfree [firstfree];
    }
  else if (pingercnt == 0xffff)
    croak ("unable to create more than 65536 AnyEvent::FastPing objects");
  else
    {
      if (pingercnt == pingermax)
        {
          pingermax = pingermax * 2 + 16;
          pingers    = realloc (pingers   , sizeof (pingers    [0]) * pingermax);
          pingerfree = realloc (pingerfree, sizeof (pingerfree [0]) * pingermax);
        }

      self->id = pingercnt++;
    }

  pingers [self->id] = self;

  self->recvcb   = &PL_sv_undef;
  self->next     = 0.;
  self->interval = MIN_INTERVAL;
  self->maxrtt   = 0.5;
  self->rangemax = 16;
  self->ranges   = malloc (sizeof (self->ranges [0]) * self->rangemax);
}

static void
pinger_free (PINGER *self)
{
  pinger_stop (self);

  pingers [self->id] = 0;

  SvREFCNT_dec (self->recvq);
  SvREFCNT_dec (self->recvcb);

  pingerfree [self->id] = firstfree;
  firstfree = self->id;

  while (self->rangecnt)
    range_free (self->ranges [--self->rangecnt]);

  free (self->ranges);
}

static void
pinger_add_range (PINGER *self, RANGE *range)
{
  if (self->rangecnt == self->rangemax)
    self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));

  self->ranges [self->rangecnt] = range;
  upheap (self, self->rangecnt);
  ++self->rangecnt;
}

/*****************************************************************************/

static void
recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt)
{
  if (!self->recvq)
    {
      /* first seen this round */
      if (!SvOK (self->recvcb))
        return;

      self->recvq = newAV ();

      self->nextrecv = firstrecv;
      firstrecv = self->id;
    }

  {
    AV *pkt = newAV ();

    av_extend (pkt, 2-1);

    AvARRAY (pkt)[0] = newSVpvn (addr, addrlen);
    AvARRAY (pkt)[1] = newSVnv (rtt);
    AvFILLp (pkt) = 2-1;

    av_push (self->recvq, newRV_noinc ((SV *)pkt));
  }
}

static void
recv_flush (void)
{
  if (firstrecv < 0)
    return;

  ENTER;
  SAVETMPS;

  do
    {
      dSP;
      PINGER *self = pingers [firstrecv];
      firstrecv = self->nextrecv;

      self->nextrecv = -1;

      PUSHMARK (SP);
      XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq)));
      self->recvq = 0;
      PUTBACK;
      call_sv (self->recvcb, G_DISCARD | G_VOID);
    }
  while (firstrecv >= 0);

  FREETMPS;
  LEAVE;
}

/*****************************************************************************/

#if 0
static void
feed_reply (AV *res_av)
{
  dSP;
  SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
  int i;

  if (av_len (res_av) < 0)
    return;

  ENTER;
  SAVETMPS;

  for (i = av_len (cbs) + 1; i--; )
    {
      SV *cb = *av_fetch (cbs, i, 1);

      PUSHMARK (SP);
      XPUSHs (res);
      PUTBACK;
      call_sv (cb, G_DISCARD | G_VOID);
    }

  FREETMPS;
  LEAVE;
}
#endif

static void
boot_protocols (void)
{
  icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
  fcntl (icmp4_fd, F_SETFL, O_NONBLOCK);
#ifdef ICMP_FILTER
  {
    struct icmp_filter oval;
    oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
    setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
  }
#endif

#if ENABLE_IPV6
  icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
  fcntl (icmp6_fd, F_SETFL, O_NONBLOCK);
# ifdef ICMP6_FILTER
  {
    struct icmp6_filter oval;
    ICMP6_FILTER_SETBLOCKALL (&oval);
    ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
    setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
  }
# endif
#endif
}

static void
boot (void)
{
  if (pipe (thr_res) < 0)
    croak ("AnyEvent::FastPing: unable to create receive pipe");

  sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_res [0]);

  boot_protocols ();

  sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
  sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
}

#define NOT_RUNNING \
  if (self->running) \
    croak ("AnyEvent::FastPing object has been started - you have to stop it first before calling this method, caught");

MODULE = AnyEvent::FastPing		PACKAGE = AnyEvent::FastPing		PREFIX = pinger_

PROTOTYPES: DISABLE

BOOT:
{
	HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
	
	if (sizeof (PKT) & 3)
	  croak ("size of PKT structure is not a multiple of 4");

	newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
	newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));

        boot_protocols ();
	
	newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
	newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
	
        close (icmp4_fd);
        close (icmp6_fd);
}

void
_boot ()
	CODE:
	boot ();

void
_recv_icmp4 (...)
	CODE:
{
	char buf [512];
        struct sockaddr_in sa;
        int maxrecv;

        for (maxrecv = 256+1; --maxrecv; )
          {
            PINGER *pinger;
            IP4HDR *iphdr = (IP4HDR *)buf;
            socklen_t sl = sizeof (sa);
            int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
            int hdrlen, totlen;
            PKT *pkt;

            if (len <= HDR_SIZE_IP4)
              break;

            hdrlen = (iphdr->version_ihl & 15) * 4;
            totlen = ntohs (iphdr->tot_len);

            if (totlen > len
                || iphdr->protocol != IPPROTO_ICMP
                || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
              continue;

            pkt = (PKT *)(buf + hdrlen);

            if (pkt->type != ICMP4_ECHO_REPLY
                || pkt->pinger >= pingercnt
                || !pingers [pkt->pinger])
              continue;

            pinger = pingers [pkt->pinger];

            if (!pkt_is_valid_for (pkt, pinger))
              continue;

            recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt));
          }

        recv_flush ();
}

void
_recv_icmp6 (...)
	CODE:
{
        struct sockaddr_in6 sa;
        PKT pkt;
        int maxrecv;

        for (maxrecv = 256+1; --maxrecv; )
          {
            PINGER *pinger;
            socklen_t sl = sizeof (sa);
            int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);

            if (len != sizeof (PKT))
              break;

            if (pkt.type != ICMP6_ECHO_REPLY
                || pkt.pinger >= pingercnt
                || !pingers [pkt.pinger])
              continue;

            pinger = pingers [pkt.pinger];

            if (!pkt_is_valid_for (&pkt, pinger))
              continue;

            recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt));
          }

        recv_flush ();
}

void
_new (SV *klass, UV magic1, UV magic2, UV magic3)
	PPCODE:
{
        SV *pv = NEWSV (0, sizeof (PINGER));
        PINGER *self = (PINGER *)SvPVX (pv);

        SvPOK_only (pv);
        XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPVutf8_nolen (klass), 1))));
        pinger_init (self);
        self->magic1 = magic1;
        self->magic2 = magic2;
        self->magic3 = magic3;
}

void
_free (PINGER *self)
	CODE:
        pinger_free (self);

IV
id (PINGER *self, ...)
	CODE:
        RETVAL = self->id;
	OUTPUT:
        RETVAL

void pinger_start (PINGER *self)

void pinger_stop (PINGER *self)

void
_stop_id (UV id)
	CODE:
        if (id < pingercnt && pingers [id])
          pinger_stop (pingers [id]);

void
interval (PINGER *self, NV interval)
	CODE:
        NOT_RUNNING;
        self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;

void
max_rtt (PINGER *self, NV maxrtt)
	CODE:
        NOT_RUNNING;
        self->maxrtt = maxrtt;

void
on_recv (PINGER *self, SV *cb)
	CODE:
        SvREFCNT_dec (self->recvcb);
        self->recvcb = newSVsv (cb);

void
add_range (PINGER *self, SV *lo_, SV *hi_, NV interval = 0)
	CODE:
{
	STRLEN lo_len, hi_len;
  	char *lo = SvPVbyte (lo_, lo_len);
  	char *hi = SvPVbyte (hi_, hi_len);
        RANGE *range;
        NOT_RUNNING;

        if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
          croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");

        if (lo_len ==  4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
        if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");

        if (memcmp (lo, hi, lo_len) > 0)
          croak ("AnyEvent::FastPing::add_range called with lo > hi");

        range = calloc (1, sizeof (RANGE));

        range->next     = 0;
        range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
        range->addrlen  = lo_len;

        memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
        memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);

        pinger_add_range (self, range);
}

void
add_hosts (PINGER *self, SV *addrs, NV interval = 0, UV interleave = 1)
	CODE:
{
  	AV *av;
        int i, j, k;
        int cnt;
        int addrlen = 0;
        RANGE *range;
        NOT_RUNNING;

        if (!SvROK (addrs) || SvTYPE (SvRV (addrs)) != SVt_PVAV)
          croak ("AnyEvent::FastPing::add_hosts expects an arrayref with binary IPv4 or IPv6 addresses");

        av = (AV *)SvRV (addrs);
        cnt = av_len (av) + 1;

        for (i = 0; i < cnt; ++i)
          {
            SV *sv = *av_fetch (av, i, 1);
            sv_utf8_downgrade (sv, 0);

            j = SvCUR (sv);

            if (j != 4 && j != 16)
              croak ("AnyEvent::FastPing::add_hosts addresses must be specified as binary IPv4 or IPv6 addresses");

            if (j > addrlen)
              addrlen = j;
          }

        if (!cnt)
          XSRETURN_EMPTY;

        range = calloc (1, sizeof (RANGE) + cnt * addrlen);

        range->next     = 0;
        range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
        range->addrlen  = addrlen;
        range->addrcnt  = cnt;

        if (interleave == 0)
          interleave = cnt <= 256 * 256 ? 256 : (int)sqrtf (cnt);

        k = cnt;
        for (j = 0; j < interleave; ++j)
          for (i = j; i < cnt; i += interleave)
            {
              uint8_t *dst = (uint8_t *)(range + 1) + --k * addrlen;
              char *pv;
              STRLEN pvlen;
              SV *sv = *av_fetch (av, i, 1);
              sv_utf8_downgrade (sv, 0);

              pv = SvPVbyte (sv, pvlen);

              if (pvlen != addrlen)
                {
                  dst [ 0] = 0x00; dst [ 1] = 0x00; dst [ 2] = 0x00; dst [ 3] = 0x00;
                  dst [ 4] = 0x00; dst [ 5] = 0x00; dst [ 6] = 0x00; dst [ 7] = 0x00;
                  dst [ 8] = 0x00; dst [ 9] = 0x00; dst [10] = 0xff; dst [11] = 0xff;
                  dst [12] = pv [0]; dst [13] = pv [1]; dst [14] = pv [2]; dst [15] = pv [3];
                }
              else
                memcpy (dst, pv, addrlen);
            }

        pinger_add_range (self, range);
}