The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MangoX::Queue - A MongoDB queue implementation using Mango

DESCRIPTION

MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.

MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.

For an introduction to MangoX::Queue, see MangoX::Queue::Tutorial.

API change - the current API is inconsistent with Mojo::IOLoop and other Mojolicious modules. A delay_compat option has been added, which is currently disabled by default. This will be enabled by default in a future release, and eventually deprecated.

SYNOPSIS

Non-blocking

Non-blocking mode requires a running Mojo::IOLoop.

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To add a job
    enqueue $queue 'test' => sub { my $id = shift; };

    # To set options
    enqueue $queue priority => 1, created => bson_time, 'test' => sub { my $id = shift; };

    # To watch for a specific job status
    watch $queue $id, 'Complete' => sub {
        # Job status is 'Complete'
    };

    # To fetch a job
    fetch $queue sub {
        my ($job) = @_;
        # ...
    };

    # To get a job by id
    get $queue $id => sub { my $job = shift; };

    # To requeue a job
    requeue $queue $job => sub { my $id = shift; };

    # To dequeue a job
    dequeue $queue $id => sub { };

    # To consume a queue
    my $consumer = consume $queue sub {
        my ($job) = @_;
        # ...
    };

    # To stop consuming a queue
    release $queue $consumer;

    # To listen for errors
    on $queue error => sub { my ($queue, $error) = @_; };

Blocking

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To add a job
    my $id = enqueue $queue 'test';

    # To set options
    my $id = enqueue $queue priority => 1, created => bson_time, 'test';

    # To watch for a specific job status
    watch $queue $id;

    # To fetch a job
    my $job = fetch $queue;

    # To get a job by id
    my $job = get $queue $id;

    # To requeue a job
    my $id = requeue $queue $job;

    # To dequeue a job
    dequeue $queue $id;

    # To consume a queue
    while(my $job = consume $queue) {
        # ...
    }

Other

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To listen for events
    on $queue enqueued => sub ( my ($queue, $job) = @_; };
    on $queue dequeued => sub ( my ($queue, $job) = @_; };
    on $queue consumed => sub { my ($queue, $job) = @_; };

    # To register a plugin
    plugin $queue 'MangoX::Queue::Plugin::Statsd';

ATTRIBUTES

MangoX::Queue implements the following attributes.

collection

    my $collection = $queue->collection;
    $queue->collection($mango->db('foo')->collection('bar'));

    my $queue = MangoX::Queue->new(collection => $collection);

The Mango::Collection representing the MongoDB queue collection.

delay

    my $delay = $queue->delay;
    $queue->delay(MangoX::Queue::Delay->new);

The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.

delay_compat

    my $compat = $queue->delay_compat;
    $queue->delay_compat(1);

Enabling delay_compat passes $self as the first argument to queue callbacks, to fix a compatibility bug with Mojo::IOLoop.

This will be enabled by default in a future release. Please migrate your code to work with the new API, and enable delay_compat on construction:

    my $queue = MangoX::Queue->new(delay_compat => 1);

concurrent_job_limit

    my $concurrent_job_limit = $queue->concurrent_job_limit;
    $queue->concurrent_job_limit(20);

The maximum number of concurrent jobs (jobs consumed from the queue and unfinished). Defaults to 10.

This only applies to jobs on the queue in non-blocking mode. MangoX::Queue has an internal counter that is incremented when a job has been consumed from the queue (in non-blocking mode). The job returned is a MangoX::Queue::Job instance and has a descructor method that is called to decrement the internal counter. See MangoX::Queue::Job for more details.

Set to -1 to disable queue concurrency limits. Use with caution, this could result in out of memory errors or an extremely slow event loop.

If you need to decrement the job counter early (e.g. to hold on to a reference to the job after you've finished processing it), you can call the finished method on the MangoX::Queue::Job object.

    $job->finished;

failed_status

    $self->failed_status('Failed');

Set a custom failed status.

no_binary_oid

    $no_bin = $self->no_binary_oid;
    $self->no_binary_oid(1);

Set to 1 to disable binary ObjectIDs being returned by MangoX::Queue in the Job object.

pending_status

    $self->pending_status('Pending');
    $self->pending_status(['Pending', 'pending']);

Set a custom pending status, can be an array ref.

plugins

    my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

processing_status

    $self->processing_status('Processing');

Set a custom processing status.

retries

    my $retries = $queue->retries;
    $queue->retries(5);

The number of times a job will be picked up from the queue before it is marked as failed.

timeout

    my $timeout = $queue->timeout;
    $queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.

EVENTS

MangoX::Queue inherits from Mojo::EventEmitter and emits the following events.

Events are emitted only for actions on the current queue object, not the entire queue.

consumed

    on $queue consumed => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is consumed (either via consume or fetch)

dequeued

    on $queue dequeued => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is dequeued

enqueued

    on $queue enqueued => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is enqueued

concurrent_job_limit_reached

    on $queue enqueued => sub {
        my ($queue, $concurrent_job_limit) = @_;
        # ...
    };

Emitted when a job is found but the </concurrent_job_limit> limit has been reached.

METHODS

MangoX::Queue implements the following methods.

consume

    # In blocking mode
    while(my $job = consume $queue) {
        # ...
    }

    # In non-blocking mode
    consume $queue sub {
        my ($job) = @_;
        # ...
    };

Waits for jobs to arrive on the queue, sleeping between queue checks using MangoX::Queue::Delay or Mojo::IOLoop.

Currently sets the status to 'Retrieved' before returning the job.

dequeue

    my $job = fetch $queue;
    dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

enqueue

    my $id = enqueue $queue 'job name';
    my $id = enqueue $queue [ 'some', 'data' ];
    my $id = enqueue $queue +{ foo => 'bar' };

Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.

You can set queue options including priority, created and status.

    my $id = enqueue $queue,
        priority => 1,
        created => bson_time,
        status => 'Pending',
        +{
            foo => 'bar'
        };

For non-blocking mode, pass in a coderef as the final argument.

    my $id = enqueue $queue 'job_name' => sub {
        # ...
    };

    my $id = enqueue $queue priority => 1, +{
        foo => 'bar',
    } => sub {
        # ...
    };

Sets the status to 'Pending' by default.

fetch

    # In blocking mode
    my $job = fetch $queue;

    # In non-blocking mode
    fetch $queue sub {
        my ($job) = @_;
        # ...
    };

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

get

    # In non-blocking mode
    get $queue $id => sub {
        my ($job) = @_;
        # ...
    };

    # In blocking mode
    my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

You can also pass in a job instead of an ID.

    $job = get $queue $job;

get_options

    my $options = $queue->get_options;

Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.

release

    my $consumer = consume $queue sub {
        # ...
    };
    release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

requeue

    my $job = fetch $queue;
    requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

update

    my $job = fetch $queue;
    $job->{status} = 'Failed';
    update $queue $job;

Updates a job in the queue.

watch

Wait for a job to enter a certain status.

    # In blocking mode
    my $id = enqueue $queue 'test';
    watch $queue $id, 'Complete'; # blocks until job is complete

    # In non-blocking mode
    my $id = enqueue $queue 'test';
    watch $queue $id, 'Complete' => sub {
        # ...
    };

FUTURE JOBS

Jobs can be queued in advance by setting a delay_until attribute:

    enqueue $queue delay_until => (time + 20), "job name";

ERRORS

Errors are reported by MangoX::Queue using callbacks and Mojo::EventEmitter

To listen for all errors on a queue, subscribe to the 'error' event:

    $queue->on(error => sub {
        my ($queue, $job, $error) = @_;
        # ...
    });

To check for errors against an individual update, enqueue or dequeue call, you can check for an error argument to the callback sub:

    enqueue $queue +$job => sub {
        my ($job, $error) = @_;

        if($error) {
            # ...
        }
    }

CONTRIBUTORS

Ben Vinnerd, ben@vinnerd.com =item Olivier Duclos, github.com/oliwer

SEE ALSO

MangoX::Queue::Tutorial, Mojolicious, Mango