NAME
MongoDBx::Queue - A message queue implemented with MongoDB
VERSION
version 0.003
SYNOPSIS
use v5.10;
use MongoDB;
use MongoDBx::Queue;
my $connection = MongoDB::Connection->new( @parameters );
my $database = $connection->get_database("queue_db");
my $queue = MongoDBx::Queue->new( { db => $database } );
$queue->add_task( { msg => "Hello World" } );
$queue->add_task( { msg => "Goodbye World" } );
while ( my $task = $queue->reserve_task ) {
say $task->{msg};
$queue->remove_task( $task );
}
DESCRIPTION
ALPHA -- this is an early release and is still in development. Testing
and feedback welcome.
MongoDBx::Queue implements a simple, prioritized message queue using
MongoDB as a backend. By default, messages are prioritized by insertion
time, creating a FIFO queue.
On a single host with MongoDB, it provides a zero-configuration message
service across local applications. Alternatively, it can use a MongoDB
database cluster that provides replication and fail-over for an even
more durable, multi-host message queue.
Features:
* messages as hash references, not objects
* arbitrary message fields
* arbitrary scheduling on insertion
* atomic message reservation
* stalled reservations can be timed-out
* task rescheduling
Not yet implemented:
* parameter checking
* error handling
Warning: do not use with capped collections, as the queued messages will
not meet the constraints required by a capped collection.
ATTRIBUTES
db
A MongoDB::Database object to hold the queue. Required.
name
A collection name for the queue. Defaults to 'queue'. The collection
must only be used by MongoDBx::Queue or unpredictable awful things will
happen.
safe
Boolean that controls whether 'safe' inserts/updates are done. Defaults
to true.
METHODS
new
$queue = MongoDBx::Queue->new( { db => $database, %options } );
Creates and returns a new queue object. The "db" argument is required.
Other attributes may be provided as well.
add_task
$queue->add_task( \%message, \%options );
Adds a task to the queue. The "\%message" hash reference will be shallow
copied into the task and not include objects except as described by
MongoDB::DataTypes. Top-level keys must not start with underscores,
which are reserved for MongoDBx::Queue.
The "\%options" hash reference is optional and may contain the following
key:
* "priority": sets the priority for the task. Defaults to "time()".
Note that setting a "future" priority may cause a task to be invisible
to "reserve_task". See that method for more details.
reserve_task
$task = $queue->reserve_task;
$task = $queue->reserve_task( \%options );
Atomically marks and returns a task. The task is marked in the queue as
"reserved" (in-progress) so it can not be reserved again unless is is
rescheduled or timed-out. The task returned is a hash reference
containing the data added in "add_task", including private keys for use
by MongoDBx::Queue methods.
Tasks are returned in priority order from lowest to highest. If multiple
tasks have identical, lowest priorities, their ordering is undefined. If
no tasks are available or visible, it will return "undef".
The "\%options" hash reference is optional and may contain the following
key:
* "max_priority": sets the maximum priority for the task. Defaults to
"time()".
The "max_priority" option controls whether "future" tasks are visible.
If the lowest task priority is greater than the "max_priority", this
method returns "undef".
reschedule_task
$queue->reschedule_task( $task );
$queue->reschedule_task( $task, \%options );
Releases the reservation on a task so it can be reserved again.
The "\%options" hash reference is optional and may contain the following
key:
* "priority": sets the priority for the task. Defaults to the task's
original priority.
Note that setting a "future" priority may cause a task to be invisible
to "reserve_task". See that method for more details.
remove_task
$queue->remove_task( $task );
Removes a task from the queue (i.e. indicating the task has been
processed).
apply_timeout
$queue->apply_timeout( $seconds );
Removes reservations that occurred more than $seconds ago. If no
argument is given, the timeout defaults to 120 seconds. The timeout
should be set longer than the expected task processing time, so that
only dead/hung tasks are returned to the active queue.
search
my @results = $queue->search( \%query, \%options );
Returns a list of tasks in the queue based on search criteria. The query
should be expressed in the usual MongoDB fashion. In addition to MongoDB
options "limit", "skip" and "sort", this method supports a "reserved"
option. If present, results will be limited to reserved tasks if true or
unreserved tasks if false.
peek
$queue->peek( $task );
Retrieves a copy of the task from the queue. This is useful to retrieve
all fields from a partial-field result from "search". It is equivalent
to:
$self->search( { _id => $task->{_id} } );
size
$queue->size;
Returns the number of tasks in the queue, including in-progress ones.
waiting
$queue->waiting;
Returns the number of tasks in the queue that have not been reserved.
SUPPORT
Bugs / Feature Requests
Please report any bugs or feature requests through the issue tracker at
<https://github.com/dagolden/mongodbx-queue/issues>. You will be
notified automatically of any progress on your issue.
Source Code
This is open source software. The code repository is available for
public review and contribution under the terms of the license.
<https://github.com/dagolden/mongodbx-queue>
git clone git://github.com/dagolden/mongodbx-queue.git
AUTHOR
David Golden <dagolden@cpan.org>
COPYRIGHT AND LICENSE
This software is Copyright (c) 2012 by David Golden.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004