MongoDBx::Queue - A message queue implemented with MongoDB
version 0.002
use v5.10; use MongoDB; use MongoDBx::Queue; my $connection = MongoDB::Connection->new( @parameters ); my $database = $connection->get_database("queue_db"); my $queue = MongoDBx::Queue->new( { db => $database } ); $queue->add_task( { msg => "Hello World" } ); $queue->add_task( { msg => "Goodbye World" } ); while ( my $task = $queue->reserve_task ) { say $task->{msg}; $queue->remove_task( $task ); }
ALPHA -- this is an early release and is still in development. Testing and feedback welcome.
MongoDBx::Queue implements a simple, prioritized message queue using MongoDB as a backend. By default, messages are prioritized by insertion time, creating a FIFO queue.
On a single host with MongoDB, it provides a zero-configuration message service across local applications. Alternatively, it can use a MongoDB database cluster that provides replication and fail-over for an even more durable, multi-host message queue.
Features:
messages as hash references, not objects
arbitrary message fields
arbitrary scheduling on insertion
atomic message reservation
stalled reservations can be timed-out
task rescheduling
Not yet implemented:
parameter checking
error handling
Warning: do not use with capped collections, as the queued messages will not meet the constraints required by a capped collection.
A MongoDB::Database object to hold the queue. Required.
A collection name for the queue. Defaults to 'queue'. The collection must only be used by MongoDBx::Queue or unpredictable awful things will happen.
Boolean that controls whether 'safe' inserts/updates are done. Defaults to true.
$queue = MongoDBx::Queue->new( { db => $database, %options } );
Creates and returns a new queue object. The db argument is required. Other attributes may be provided as well.
db
$queue->add_task( \%message, \%options );
Adds a task to the queue. The \%message hash reference will be shallow copied into the task and not include objects except as described by MongoDB::DataTypes. Top-level keys must not start with underscores, which are reserved for MongoDBx::Queue.
\%message
The \%options hash reference is optional and may contain the following key:
\%options
priority: sets the priority for the task. Defaults to time().
priority
time()
Note that setting a "future" priority may cause a task to be invisible to reserve_task. See that method for more details.
reserve_task
$task = $queue->reserve_task; $task = $queue->reserve_task( \%options );
Atomically marks and returns a task. The task is marked in the queue as "reserved" (in-progress) so it can not be reserved again unless is is rescheduled or timed-out. The task returned is a hash reference containing the data added in add_task, including private keys for use by MongoDBx::Queue methods.
add_task
Tasks are returned in priority order from lowest to highest. If multiple tasks have identical, lowest priorities, their ordering is undefined. If no tasks are available or visible, it will return undef.
undef
max_priority: sets the maximum priority for the task. Defaults to time().
max_priority
The max_priority option controls whether "future" tasks are visible. If the lowest task priority is greater than the max_priority, this method returns undef.
$queue->reschedule_task( $task ); $queue->reschedule_task( $task, \%options );
Releases the reservation on a task so it can be reserved again.
priority: sets the priority for the task. Defaults to the task's original priority.
$queue->remove_task( $task );
Removes a task from the queue (i.e. indicating the task has been processed).
$queue->apply_timeout( $seconds );
Removes reservations that occurred more than $seconds ago. If no argument is given, the timeout defaults to 120 seconds. The timeout should be set longer than the expected task processing time, so that only dead/hung tasks are returned to the active queue.
$seconds
$queue->size;
Returns the number of tasks in the queue, including in-progress ones.
$queue->waiting;
Returns the number of tasks in the queue that have not been reserved.
Please report any bugs or feature requests through the issue tracker at https://github.com/dagolden/mongodbx-queue/issues. You will be notified automatically of any progress on your issue.
This is open source software. The code repository is available for public review and contribution under the terms of the license.
https://github.com/dagolden/mongodbx-queue
git clone git://github.com/dagolden/mongodbx-queue.git
David Golden <dagolden@cpan.org>
This software is Copyright (c) 2012 by David Golden.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004
To install MongoDBx::Queue, copy and paste the appropriate command in to your terminal.
cpanm
cpanm MongoDBx::Queue
CPAN shell
perl -MCPAN -e shell install MongoDBx::Queue
For more information on module installation, please visit the detailed CPAN module installation guide.