The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

MangoX::Queue - A MongoDB queue implementation using Mango

DESCRIPTION

MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.

MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.

For an introduction to MangoX::Queue, see MangoX::Queue::Tutorial.

SYNOPSIS

Non-blocking

Non-blocking mode requires a running Mojo::IOLoop.

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To add a job
    enqueue $queue 'test' => sub { my $id = shift; };

    # To set options
    enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test' => sub { my $id = shift; };

    # To watch for a specific job status
    watch $queue $id, 'Complete' => sub {
        # Job status is 'Complete'
    };

    # To fetch a job
    fetch $queue sub {
        my ($job) = @_;
        # ...
    };

    # To get a job by id
    get $queue $id => sub { my $job = shift; };

    # To requeue a job
    requeue $queue $job => sub { my $id = shift; };

    # To dequeue a job
    dequeue $queue $id => sub { };

    # To consume a queue
    my $consumer = consume $queue sub {
        my ($job) = @_;
        # ...
    };

    # To stop consuming a queue
    release $queue $consumer;

    # To listen for errors
    on $queue error => sub { my ($queue, $error) = @_; };

Blocking

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To add a job
    my $id = enqueue $queue 'test';

    # To set options
    my $id = enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test';

    # To watch for a specific job status
    watch $queue $id;

    # To fetch a job
    my $job = fetch $queue;

    # To get a job by id
    my $job = get $queue $id;

    # To requeue a job
    my $id = requeue $queue $job;

    # To dequeue a job
    dequeue $queue $id;

    # To consume a queue
    while(my $job = consume $queue) {
        # ...
    }

Other

    my $queue = MangoX::Queue->new(collection => $mango_collection);

    # To listen for events
    on $queue enqueued => sub ( my ($queue, $job) = @_; };
    on $queue dequeued => sub ( my ($queue, $job) = @_; };
    on $queue consumed => sub { my ($queue, $job) = @_; };

    # To register a plugin
    plugin $queue 'MangoX::Queue::Plugin::Statsd';

ATTRIBUTES

MangoX::Queue implements the following attributes.

collection

    my $collection = $queue->collection;
    $queue->collection($mango->db('foo')->collection('bar'));

    my $queue = MangoX::Queue->new(collection => $collection);

The Mango::Collection representing the MongoDB queue collection.

delay

    my $delay = $queue->delay;
    $queue->delay(MangoX::Queue::Delay->new);

The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.

plugins

    my $plugins = $queue->plugins;

Returns a hash containing the plugins registered with this queue.

retries

    my $retries = $queue->retries;
    $queue->retries(5);

The number of times a job will be picked up from the queue before it is marked as failed.

timeout

    my $timeout = $queue->timeout;
    $queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.

EVENTS

MangoX::Queue inherits from Mojo::EventEmitter and emits the following events.

Events are emitted only for actions on the current queue object, not the entire queue.

consumed

    on $queue consumed => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is consumed (either via consume or fetch)

dequeued

    on $queue dequeued => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is dequeued

enqueued

    on $queue enqueued => sub {
        my ($queue, $job) = @_;
        # ...
    };

Emitted when an item is enqueued

METHODS

MangoX::Queue implements the following methods.

consume

    # In blocking mode
    while(my $job = consume $queue) {
        # ...
    }

    # In non-blocking mode
    consume $queue sub {
        my ($job) = @_;
        # ...
    };

Waits for jobs to arrive on the queue, sleeping between queue checks using MangoX::Queue::Delay or Mojo::IOLoop.

Currently sets the status to 'Retrieved' before returning the job.

dequeue

    my $job = fetch $queue;
    dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

enqueue

    my $id = enqueue $queue 'job name';
    my $id = enqueue $queue [ 'some', 'data' ];
    my $id = enqueue $queue +{ foo => 'bar' };

Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.

You can set queue options including priority, created and status.

    my $id = enqueue $queue,  
        priority => 1,
        created => time,
        status => 'Pending',
        +{
            foo => 'bar'
        };

For non-blocking mode, pass in a coderef as the final argument.

    my $id = enqueue $queue 'job_name' => sub {
        # ...
    };

    my $id = enqueue $queue priority => 1, +{
        foo => 'bar',
    } => sub {
        # ...
    };

Sets the status to 'Pending' by default.

fetch

    # In blocking mode
    my $job = fetch $queue;

    # In non-blocking mode
    fetch $queue sub {
        my ($job) = @_;
        # ...
    };

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

get

    # In non-blocking mode
    get $queue $id => sub {
        my ($job) = @_;
        # ...
    };

    # In blocking mode
    my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

You can also pass in a job instead of an ID.

    $job = get $queue $job;

get_options

    my $options = $queue->get_options;

Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.

release

    my $consumer = consume $queue sub {
        # ...
    };
    release $queue $consumer;

Releases a non-blocking consumer from watching a queue.

requeue

    my $job = fetch $queue;
    requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

update

    my $job = fetch $queue;
    $job->{status} = 'Failed';
    update $queue $job;

Updates a job in the queue.

watch

Wait for a job to enter a certain status.

    # In blocking mode
    my $id = enqueue $queue 'test';
    watch $queue $id, 'Complete'; # blocks until job is complete

    # In non-blocking mode
    my $id = enqueue $queue 'test';
    watch $queue $id, 'Complete' => sub {
        # ...
    };

SEE ALSO

MangoX::Queue::Tutorial, Mojolicious, Mango