
POE::Component::MessageQueue - A POE message queue that uses STOMP for its communication protocol

If you are only interested in running with the recommended storage backend and some predetermined defaults, you can use the included command line script:
POE::Component::MessageQueue version 0.2.6 Copyright 2007, 2008, 2009 David Snopek (http://www.hackyourlife.org) Copyright 2007, 2008 Paul Driver <frodwith@gmail.com> Copyright 2007 Daisuke Maki <daisuke@endeworks.jp> mq.pl [--port|-p <num>] [--hostname|-h <host>] [--front-store <str>] [--front-max <size>] [--granularity <seconds>] [--nouuids] [--timeout|-i <seconds>] [--throttle|-T <count>] [--pump-freq|-Q <seconds>] [--data-dir <path_to_dir>] [--log-conf <path_to_file>] [--stats-interval|-i <seconds>] [--stats] [--pidfile|-p <path_to_file>] [--background|-b] [--crash-cmd <path_to_script>] [--debug-shell] [--version|-v] [--help|-h] SERVER OPTIONS: --port -p <num> The port number to listen on (Default: 61613) --hostname -h <host> The hostname of the interface to listen on (Default: localhost) STORAGE OPTIONS: --front-store -f <str> Specify which in-memory storage engine to use for the front-store (can be memory or bigmemory). --front-max <size> How much message body the front-store should cache. This size is specified in "human-readable" format as per the -h option of ls, du, etc. (ex. 2.5M) --timeout -i <secs> The number of seconds to keep messages in the front-store (Default: 4) --pump-freq -Q <secs> How often (in seconds) to automatically pump each queue. Set to zero to disable this timer entirely (Default: 0) --granularity <secs> How often (in seconds) Complex should check for messages that have passed the timeout. --[no]uuids Use (or do not use) UUIDs instead of incrementing integers for message IDs. Default: uuids --throttle -T <count> The number of messages that can be stored at once before throttling (Default: 2) --data-dir <path> The path to the directory to store data (Default: /var/lib/perl_mq) --log-conf <path> The path to the log configuration file (Default: /etc/perl_mq/log.conf STATISTICS OPTIONS: --stats If specified the, statistics information will be written to $DATA_DIR/stats.yml --stats-interval <secs> Specifies the number of seconds to wait before dumping statistics (Default: 10) DAEMON OPTIONS: --background -b If specified the script will daemonize and run in the background --pidfile -p <path> The path to a file to store the PID of the process --crash-cmd <path> The path to a script to call when crashing. A stacktrace will be printed to the script's STDIN. (ex. 'mail root@localhost') OTHER OPTIONS: --debug-shell Run with POE::Component::DebugShell --version -v Show the current version. --help -h Show this usage message

use Net::Stomp;
my $stomp = Net::Stomp->new({
hostname => 'localhost',
port => 61613
});
# Currently, PoCo::MQ doesn't do any authentication, so you can put
# whatever you want as the login and passcode.
$stomp->connect({ login => $USERNAME, passcode => $PASSWORD });
$stomp->subscribe({
destination => '/queue/my_queue.sub_queue',
ack => 'client'
});
while (1)
{
my $frame = $stomp->receive_frame;
print $frame->body . "\n";
$stomp->ack({ frame => $frame });
}
$stomp->disconnect();
use Net::Stomp;
my $stomp = Net::Stomp->new({
hostname => 'localhost',
port => 61613
});
# Currently, PoCo::MQ doesn't do any authentication, so you can put
# whatever you want as the login and passcode.
$stomp->connect({ login => $USERNAME, passcode => $PASSWORD });
$stomp->send({
destination => '/queue/my_queue.sub_queue',
body => 'I am a message',
persistent => 'true',
});
$stomp->disconnect();
If you want to use a different arrangement of storage engines or to embed PoCo::MQ inside another application, the following synopsis may be useful to you:
use POE;
use POE::Component::Logger;
use POE::Component::MessageQueue;
use POE::Component::MessageQueue::Storage::Default;
use Socket; # For AF_INET
use strict;
my $DATA_DIR = '/tmp/perl_mq';
# we create a logger, because a production message queue would
# really need one.
POE::Component::Logger->spawn(
ConfigFile => 'log.conf',
Alias => 'mq_logger'
);
POE::Component::MessageQueue->new({
port => 61613, # Optional.
address => '127.0.0.1', # Optional.
hostname => 'localhost', # Optional.
domain => AF_INET, # Optional.
logger_alias => 'mq_logger', # Optional.
# Required!!
storage => POE::Component::MessageQueue::Storage::Default->new({
data_dir => $DATA_DIR,
timeout => 2,
throttle_max => 2
})
});
POE::Kernel->run();
exit;

This module implements a message queue [1] on top of POE that communicates via the STOMP protocol [2].
There exist a few good Open Source message queues, most notably ActiveMQ [3] which is written in Java. It provides more features and flexibility than this one (while still implementing the STOMP protocol), however, it was (at the time I last used it) very unstable. With every version there was a different mix of memory leaks, persistence problems, STOMP bugs, and file descriptor leaks. Due to its complexity I was unable to be very helpful in fixing any of these problems, so I wrote this module!
This component distinguishes itself in a number of ways:
You can see the main STOMP documentation here: http://stomp.codehaus.org/Protocol
PoCo::MQ implements a number of non-standard STOMP headers:
Set to the string "true" to request that a message be persisted. Not setting this header or setting it to any other value, means that a message is non-persistent.
Many storage engines ignore the "persistent" header, either persisting all messages or no messages, so be sure to check the documentation for your storage engine.
Using the Complex or Default storage engines, persistent messages will always be sent to the back store and non-persistent messages will be discarded eventually.
For non-persistent messages, you can set this header to the number of seconds this message must be kept before being discarded. This is ignored for persistent messages.
Many storage engines ignore the "expire-after" header, so be sure to check the documentation for your storage engine.
Using the Complex or Default storage engines, this header will be honored. If it isn't specified, non-persistent messages are discarded when pushed out of the front store.
For both persistent or non-persistent messages, you can set this header to the number of seconds this message should be held before being delivered. In other words, this allows you to delay delivery of a message for an arbitrary number of seconds.
All the storage engines in the standard distribution support this header. But it will not work without a pump frequency enabled! If using mq.pl, enable with --pump-freq or if creating a POE::Component::MessageQueue object directly, pass pump_frequency as an argument to new().
In PoCo::MQ there are two types of destinations: queues and topics
Each message is only delivered to a single subscriber (not counting messages that were delivered but not ACK'd). If there are multiple subscribers on a single queue, the messages will be divided amoung them, roughly equally.
Each message is delivered to every subscriber. Topics don't support any kind of persistence, so to get a message, a subscriber must be connected at the time it was sent.
All destination names start with either "/queue/" or "/topic/" to distinguish between queues and topics.
PoCo::MQ uses POE::Component::Logger for logging which is based on Log::Dispatch. By default mq.pl looks for a log file at: "/etc/perl_mq/log.conf". Or you can specify an alternate location with the --log-conf command line argument.
Currently the login and passcode aren't used by PoCo::MQ for auth, but they are written to the log file. In the log file clients are only identified by the client id. But if you put information identifying the client in the login/passcode you can connect that to a client id by finding it in the log.

When creating an instance of this component you must pass in a storage object so that the message queue knows how to store its messages. There are some storage backends provided with this distribution. See their individual documentation for usage information. Here is a quick break down:

The only required parameter. Sets the object that the message queue should use for message storage. This must be an object that follows the interface of POE::Component::MessageQueue::Storage but doesn't necessarily need to be a child of that class.
The session alias to use.
The optional port to listen on. If none is given, we use 61613 by default.
The option interface address to bind to. It defaults to INADDR_ANY or INADDR6_ANY when using IPv4 or IPv6, respectively.
The optional name of the interface to bind to. This will be converted to the IP and used as if you set address instead. If you set both hostname and address, address will override this value.
Optionally specifies the domain within which communication will take place. Defaults to AF_INET.
Optionally set the alias of the POE::Component::Logger object that you want the message queue to log to. If no value is given, log information is simply printed to STDERR.
Optionally set the package name to use for the Message object. This should be a child class of POE::Component::MessageQueue::Message or atleast follow the same interface.
This allows you to add new message headers which the MQ can recognize.
Optionally set how often (in seconds) to automatically pump each queue. If zero or no value is given, then this timer is disabled entirely.
When disabled, each queue is only pumped when its contents change, meaning when a message is added or removed from the queue. Normally, this is enough. However, if your storage engine holds back messages for any reason (ie. to delay their delivery) it will be necessary to enable this, so that the held back messages will ultimately be delivered.
You must enable this for the message queue to honor the deliver-after header!
Optionally pass in a number of objects that will receive information about events inside of the message queue.
Currently, only one observer is provided with the PoCo::MQ distribution: POE::Component::MessageQueue::Statistics. Please see its documentation for more information.

http://en.wikipedia.org/wiki/Message_Queue -- General information about message queues
http://stomp.codehaus.org/Protocol -- The informal "spec" for the STOMP protocol
http://www.activemq.org/ -- ActiveMQ is a popular Java-based message queue

If you used any of the following storage engines with PoCo::MQ 0.1.7 or older:
The database format has changed.
Note: When using POE::Component::MessageQueue::Storage::Default (meaning mq.pl) the database will be automatically updated in place, so you don't need to worry about this.
Included in the distribution, is a schema/ directory with a few SQL scripts for upgrading:

Please check out the Google Group at:
http://groups.google.com/group/pocomq
Or just send an e-mail to: pocomq@googlegroups.com

If you find any bugs, have feature requests, or wish to contribute, please contact us at our Google Group mentioned above. We'll do our best to help you out!
Development is coordinated via Bazaar (See http://bazaar-vcs.org). The main Bazaar branch can be found here:
http://code.hackyourlife.org/bzr/dsnopek/perl_mq/devel.mainline
We prefer that contributions come in the form of a published Bazaar branch with the changes. This helps facilitate the back-and-forth in the review process to get any new code merged into the main branch.

The goal of this module is not to support every possible feature but rather to be small, simple, efficient and robust. For the most part expect incremental changes to address those areas.
There is one remaining big feature coming soon and that is the ability to run PoCo::MQ clustered accross multiple servers with some kind of fail-over.
Beyond that we have a TODO list (shown below) called "The Long Road To 1.0". This is a list of things we feel we need to have inorder to call the product complete. That includes management and monitoring tools for sysadmins as well as documentation for developers.

Chess gaming site ChessVegas.

External modules:
POE, POE::Component::Server::Stomp, POE::Component::Client::Stomp, Net::Stomp, POE::Filter::Stomp, POE::Component::Logger, DBD::SQLite, POE::Component::Generic
Storage modules:
POE::Component::MessageQueue::Storage, POE::Component::MessageQueue::Storage::Memory, POE::Component::MessageQueue::Storage::BigMemory, POE::Component::MessageQueue::Storage::DBI, POE::Component::MessageQueue::Storage::FileSystem, POE::Component::MessageQueue::Storage::Generic, POE::Component::MessageQueue::Storage::Generic::DBI, POE::Component::MessageQueue::Storage::Double, POE::Component::MessageQueue::Storage::Throttled, POE::Component::MessageQueue::Storage::Complex, POE::Component::MessageQueue::Storage::Default
Statistics modules:
POE::Component::MessageQueue::Statistics, POE::Component::MessageQueue::Statistics::Publish, POE::Component::MessageQueue::Statistics::Publish::YAML
ID generator modules:
POE::Component::MessageQueue::IDGenerator, POE::Component::MessageQueue::IDGenerator::SimpleInt, POE::Component::MessageQueue::IDGenerator::UUID

We are serious about squashing bugs! Currently, there are no known bugs, but some probably do exist. If you find any, please let us know at the Google group.
That said, we are using this in production in a commercial application for thousands of large messages daily and we experience very few issues.

Copyright 2007, 2008, 2009 David Snopek (http://www.hackyourlife.org)
Copyright 2007, 2008 Paul Driver <frodwith@gmail.com>
Copyright 2007 Daisuke Maki <daisuke@endeworks.jp>

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.