The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl
use POE;
use POE::Component::Logger;
use POE::Component::MessageQueue;
use POE::Component::MessageQueue::Storage::Default;
use POE::Component::MessageQueue::Storage::Memory;
use POE::Component::MessageQueue::Storage::BigMemory;
use POE::Component::MessageQueue::Storage::DBI;
use POE::Component::MessageQueue::Storage::Throttled;
use Getopt::Long;
use Devel::StackTrace;
use IO::File;
use Carp;
use POSIX qw(setsid strftime);
use strict;

my $DATA_DIR = '/var/lib/perl_mq';
my $CONF_DIR = '/etc/perl_mq';
my $CONF_LOG = "$CONF_DIR/log.conf";

my $port     = 61613;
my $hostname = undef;
my $timeout  = 4;
my $granularity;
my $throttle_max = 2;
my $pump_frequency;
my $background = 0;
my $debug_shell = 0;
my $pidfile;
my $show_version = 0;
my $show_usage   = 0;
my $statistics   = 0;
my $uuids = 1;
my $stat_interval = 10;
my $front_store = 'memory';
my $front_max;
my $storage = 'default';
my $crash_cmd = undef;
my $dbi_dsn = undef;
my $dbi_username = undef;
my $dbi_password = undef;
my $mq_id = '';

GetOptions(
	"port|p=i"         => \$port,
	"hostname|h=s"     => \$hostname,
	"timeout|i=i"      => \$timeout,
	"granularity=i"    => \$granularity,
	"storage=s"        => \$storage,
	"front-store|f=s"  => \$front_store,
	"front-max=s"      => \$front_max,
	"throttle|T=i"     => \$throttle_max,
	"pump-freq|Q=i"    => \$pump_frequency,
	"data-dir=s"       => \$DATA_DIR,
	"log-conf=s"       => \$CONF_LOG,
	"stats!"           => \$statistics,
	"uuids!"           => \$uuids,
	"dbi-dsn=s"        => \$dbi_dsn,
	"dbi-username=s"   => \$dbi_username,
	"dbi-password=s"   => \$dbi_password,
	"mq-id=s"          => \$mq_id,
	"stats-interval=i" => \$stat_interval,
	"background|b"     => \$background,
	"debug-shell"      => \$debug_shell,
	"pidfile|p=s"      => \$pidfile,
	"crash-cmd=s"      => \$crash_cmd,
	"version|v"        => \$show_version,
	"help|h"           => \$show_usage,
) or usage(1);

# byte kilo mega giga tera peta exa zetta yotta
my @size_units = qw(b k m g t p e z y);
my $size_pattern = '((?:\d*\.)?\d+)(['.join('',@size_units).']?)$';
my $size_regex = qr/$size_pattern/i;
sub parse_size
{
	my $string = shift;
	if ($string =~ $size_regex)
	{
		my ($number, $unit) = ($1, lc($2));
		return $number unless $unit;
		for(my $i = 0; $i < @size_units; $i++)
		{
			if ($unit eq $size_units[$i])
			{
				return $number * (1024**$i);	
			}
		}
	}
	die "Unable to parse size: $string";
}

sub version
{
	print "POE::Component::MessageQueue version $POE::Component::MessageQueue::VERSION\n";
	print "Copyright 2007-2011 David Snopek (http://www.hackyourlife.org)\n";
	print "Copyright 2007, 2008 Paul Driver <frodwith\@gmail.com>\n";
	print "Copyright 2007 Daisuke Maki <daisuke\@endeworks.jp>\n";
}

sub usage
{
	my $exit_level = shift;
	my $X = ' ' x (length $0);
    print <<"ENDUSAGE";
$0 [--port|-p <num>]               [--hostname|-h <host>]
$X [--storage <str>]
$X [--front-store <str>]           [--front-max <size>] 
$X [--granularity <seconds>]       [--nouuids]
$X [--timeout|-i <seconds>]        [--throttle|-T <count>]
$X [--dbi-dsn <str>]               [--mq-id <str>]
$X [--dbi-username <str>]          [--dbi-password <str>]
$X [--pump-freq|-Q <seconds>]
$X [--data-dir <path_to_dir>]      [--log-conf <path_to_file>]
$X [--stats-interval|-i <seconds>] [--stats]
$X [--pidfile|-p <path_to_file>]   [--background|-b]
$X [--crash-cmd <path_to_script>]
$X [--debug-shell] [--version|-v]  [--help|-h]

SERVER OPTIONS:
  --port     -p <num>     The port number to listen on (Default: 61613)
  --hostname -h <host>    The hostname of the interface to listen on 
                          (Default: localhost)

STORAGE OPTIONS:
  --storage <str>         Specify which overall storage engine to use.  This
                          affects what other options are value.  (can be
                          default or dbi)
  --front-store -f <str>  Specify which in-memory storage engine to use for
                          the front-store (can be memory or bigmemory).
  --front-max <size>      How much message body the front-store should cache.
                          This size is specified in "human-readable" format
                          as per the -h option of ls, du, etc. (ex. 2.5M)
  --timeout -i <secs>     The number of seconds to keep messages in the 
                          front-store (Default: 4)
  --pump-freq -Q <secs>   How often (in seconds) to automatically pump each
                          queue.  Set to zero to disable this timer entirely
                          (Default: 0)
  --granularity <secs>    How often (in seconds) Complex should check for
                          messages that have passed the timeout.  
  --[no]uuids             Use (or do not use) UUIDs instead of incrementing
                          integers for message IDs.  (Default: uuids)
  --throttle -T <count>   The number of messages that can be stored at once 
                          before throttling (Default: 2)
  --data-dir <path>       The path to the directory to store data 
                          (Default: /var/lib/perl_mq)
  --log-conf <path>       The path to the log configuration file 
                          (Default: /etc/perl_mq/log.conf)

  --dbi-dsn <str>         The database DSN when using --storage dbi
  --dbi-username <str>    The database username when using --storage dbi
  --dbi-password <str>    The database password when using --storage dbi
  --mq-id <str>           A string uniquely identifying this MQ when more
                          than one MQ use the DBI database for storage

STATISTICS OPTIONS:
  --stats                 If specified the, statistics information will be 
                          written to \$DATA_DIR/stats.yml
  --stats-interval <secs> Specifies the number of seconds to wait before 
                          dumping statistics (Default: 10)

DAEMON OPTIONS:
  --background -b         If specified the script will daemonize and run in the
                          background
  --pidfile    -p <path>  The path to a file to store the PID of the process

  --crash-cmd  <path>     The path to a script to call when crashing.
                          A stacktrace will be printed to the script's STDIN.
                          (ex. 'mail root\@localhost')

OTHER OPTIONS:
  --debug-shell           Run with POE::Component::DebugShell
  --version    -v         Show the current version.
  --help       -h         Show this usage message

ENDUSAGE
	
	exit($exit_level) if (defined $exit_level);
}

if ( $show_version )
{
	version;
	exit 0;
}

if ( $show_usage )
{
	version;
	print "\n";
	usage(0);
}

if ( not -d $DATA_DIR )
{
	mkdir $DATA_DIR;

	if ( not -d $DATA_DIR )
	{
		die "Unable to create the data dir: $DATA_DIR";
	}
}

if ( $background )
{   
	# the simplest daemonize, ever.
	defined(fork() && exit 0) or "Can't fork: $!";
	setsid or die "Can't start a new session: $!";
	open STDIN,  '/dev/null' or die "Can't redirect STDIN from /dev/null: $!";
	open STDOUT, '>/dev/null' or die "Can't redirect STDOUT to /dev/null: $!";
	open STDERR, '>/dev/null' or die "Can't redirect STDERR to /dev/null: $!";
}

if ( $pidfile )
{
	my $fd = IO::File->new(">$pidfile")
		|| die "Unable to open pidfile: $pidfile: $!";
	$fd->write("$$");
	$fd->close();
}

my $logger_alias;
if ( -e $CONF_LOG )
{
	$logger_alias = 'mq_logger';

	# we create a logger, because a production message queue would
	# really need one.
	POE::Component::Logger->spawn(
		ConfigFile => $CONF_LOG,
		Alias      => $logger_alias
	);
}
else
{
	print STDERR "LOGGER: Unable to find configuration: $CONF_LOG\n";
	print STDERR "LOGGER: Will send all messages to STDERR\n";
}

if ($storage eq 'default') {
	if ($front_store eq 'memory') 
	{
		$front_store = POE::Component::MessageQueue::Storage::Memory->new();
	}
	elsif ($front_store eq 'bigmemory')
	{
		$front_store = POE::Component::MessageQueue::Storage::BigMemory->new();
	}
	else
	{
		die "Unknown front-store specified: $front_store";
	}

	$storage = POE::Component::MessageQueue::Storage::Default->new(
		data_dir     => $DATA_DIR,
		timeout      => $timeout,
		throttle_max => $throttle_max,
		front        => $front_store,
		front_max    => $front_max ? parse_size($front_max) : undef,
		granularity  => $granularity,
	);
}
else {
	if ($storage eq 'dbi')
	{
		$storage = POE::Component::MessageQueue::Storage::DBI->new(
			dsn      => $dbi_dsn,
			username => $dbi_username,
			password => $dbi_password,
			mq_id    => $mq_id,
		);
	}
	else
	{
		die "Unknown storage specified: $storage";
	}

	if ($throttle_max > 0) {
		$storage = POE::Component::MessageQueue::Storage::Throttled->new(
			back         => $storage,
			throttle_max => $throttle_max
		);
	}
}

my $idgen;
if ($uuids) 
{
	use POE::Component::MessageQueue::IDGenerator::UUID;
	$idgen = POE::Component::MessageQueue::IDGenerator::UUID->new();
}
else
{
	use POE::Component::MessageQueue::IDGenerator::SimpleInt;
	$idgen = POE::Component::MessageQueue::IDGenerator::SimpleInt->new(
		filename => "$DATA_DIR/last_id.mq",
	);
}

my %args = (
	port     => $port,
	hostname => $hostname,

	storage => $storage,

	pump_frequency => $pump_frequency,
	idgen => $idgen,
	logger_alias => $logger_alias,
);

if ($statistics) {
	require POE::Component::MessageQueue::Statistics;
	require POE::Component::MessageQueue::Statistics::Publish::YAML;
	my $stat = POE::Component::MessageQueue::Statistics->new();
	my $publish = POE::Component::MessageQueue::Statistics::Publish::YAML->spawn(
		statistics => $stat,
		output => "$DATA_DIR/stats.yml",
		interval => $stat_interval,
	);
	$args{observers} = [ $stat ];
}
my $mq = POE::Component::MessageQueue->new(%args);

# install the debug shell if requested
if ( $debug_shell )
{
	require POE::Component::DebugShell;
	POE::Component::DebugShell->spawn();
}

# install a die handler so we can catch crashes and log them
$SIG{__DIE__} = sub {
	my $trace = Devel::StackTrace->new()->as_string();
	my $banner = sprintf("\n%s\n", '=' x 30);
	my $diemsg = sprintf("$banner MQ Crashed: %s $banner\n$trace", 
		strftime('%Y-%m-%d %H:%M:%S', localtime(time())));

	# Print it first, cause don't know if the other stuff is gonna work.
	print STDERR $diemsg;

	# This will probably work, but we should say so if it doesn't.
	my $fn = "$DATA_DIR/crashed.log";
	if(open DIEFILE, ">>", $fn)
	{
		print DIEFILE $diemsg;
		close DIEFILE;	
	}
	else
	{
		print STDERR "Couldn't open crashlog '$fn': $!\n";
	}

	# Only bother if one was specified.
	if ($crash_cmd)
	{
		if (open DIEPIPE, '|-', $crash_cmd)
		{
			print DIEPIPE $diemsg;
			close DIEPIPE;	
		}
		else
		{
			print STDERR "Couldn't send crashlog to $crash_cmd: $!\n";
		}
	}
};

POE::Kernel->run();
exit;