DR::TarantoolQueue - client for tarantool's queue
my $queue = DR::TarantoolQueue->new( host => 'tarantool.host', port => 33014, tube => 'request_queue', space => 11, connect_opts => { # see perldoc DR::Tarantool reconnect_period => 1, reconnect_always => 1 } ); # put empty task into queue with name 'request_queue' my $task = $queue->put; my $task = $queue->put(data => [ 1, 2, 3 ]); printf "task.id = %s\n", $task->id;
The module contains sync and async (coro) driver for tarantool queue.
Tarantool's parameters.
Additional options for DR::Tarantool. HashRef.
If true (default) the driver will use Coro tarantool's driver, otherwise the driver will use sync driver.
Default ttl for tasks.
Default ttr for tasks.
Default pri for tasks.
Default delay for tasks.
Default space for tasks.
Default tube for tasks.
Defaults for queues. HashRef. Key is tube name. Value is a hash with the following fields:
Methods "put" ("urgent") use these parameters if they are absent (otherwise it uses the same global attributes).
my $q = DR::TarantoolQueue->new(host => 'abc.com', port => 123);
Creates new queue(s) accessor.
$q->dig(task => $task); $task->dig; # the same $q->dig(id => $task->id); $q->dig(id => $task->id, space => $task->space);
'Dig up' a buried task. Checks, that the task is buried. The task status is changed to ready.
Is a synonym of "dig".
$q->delete(task => $task); $task->delete; # the same $q->delete(id => $task->id); $q->delete(id => $task->id, space => $task->space);
Delete a task from the queue (regardless of task state or status).
$q->peek(task => $task); $task->peek; # the same $q->peek(id => $task->id); $q->peek(id => $task->id, space => $task->space);
Return a task by task id.
my $s = $q->statistics; my $s = $q->statistics(space => 123); my $s = $q->statistics(space => 123, tube => 'abc'); my $s = DR::TarantoolQueue->statistics(space => 123); my $s = DR::TarantoolQueue->statistics(space => 123, tube => 'abc');
Return queue module statistics, since server start. The statistics is broken down by queue id. Only queues on which there was some activity are included in the output.
Task was processed (and will be deleted after the call).
my $m = $q->get_meta(task => $task); my $m = $q->get_meta(id => $task->id);
Returns a hashref with fields:
task id
queue id
task status
time of the next important event in task life time, for example, when ttl or ttr expires, in microseconds since start of the UNIX epoch.
internal value of the task priority
task priority as set when the task was added to the queue
consumer id, of the consumer which took the task (only if the task is taken)
time when the task was created (microseconds since start of the UNIX epoch)
task time to live (microseconds)
task time to run (microseconds)
how many times the task was buried
how many times the task was taken
time recorded when the meta was called
$q->put; $q->put(data => { 1 => 2 }); $q->put(space => 1, tube => 'abc', delay => 10, ttl => 3600, ttr => 60, pri => 10, data => [ 3, 4, 5 ]); $q->put(data => 'string');
Enqueue a task. Returns new task object. The list of fields with task data (data => ...) is optional.
data => ...
If 'space' and (or) 'tube' aren't defined the method will try to use them from queue object.
Enqueue a task. The task will get the highest priority. If delay is not zero, the function is equivalent to put.
my $task = $q->take; my $task = $q->take(timeout => 0.5); my $task = $q->take(space => 1, tube => 'requests, timeout => 20);
If there are tasks in the queue ready for execution, take the highest-priority task. Otherwise, wait for a ready task to appear in the queue, and, as soon as it appears, mark it as taken and return to the consumer. If there is a timeout, and the task doesn't appear until the timeout expires, returns undef. If timeout is not given, waits indefinitely.
All the time while the consumer is working on a task, it must keep the connection to the server open. If a connection disappears while the consumer is still working on a task, the task is put back on the ready list.
$q->ack(task => $task); $task->ack; # the same $q->ack(id => $task->id); $q->ack(space => $task->space, id => $task->id);
Confirm completion of a task. Before marking a task as complete, this function verifies that:
the task is taken
the consumer that is confirming the task is the one which took it
Consumer identity is established using a session identifier. In other words, the task must be confirmed by the same connection which took it. If verification fails, the function returns an error.
On success, deletes the task from the queue. Throws an exception otherwise.
$q->requeue(task => $task); $task->requeue; # the same $q->requeue(id => $task->id); $q->requeue(id => $task->id, space => $task->space);
Return a task to the queue, the task is not executed. Puts the task at the end of the queue, so that it's executed only after all existing tasks in the queue are executed.
$q->bury(task => $task); $task->bury; # the same $q->bury(id => $task->id); $q->bury(id => $task->id, space => $task->space);
Mark a task as buried. This special status excludes the task from the active list, until it's dug up. This function is useful when several attempts to execute a task lead to a failure. Buried tasks can be monitored by the queue owner, and treated specially.
$q->release(task => $task); $task->release; # the same $q->release(id => $task->id, space => $task->space); $q->release(task => $task, delay => 10); # delay the task $q->release(task => $task, ttl => 3600); # append task's ttl
Return a task back to the queue: the task is not executed. Additionally, a new time to live and re-execution delay can be provided.
$q->done(task => $task, data => { result => '123' }); $task->done(data => { result => '123' }); # the same $q->done(id => $task->id, space => $task->space);
Mark a task as complete (done), but don't delete it. Replaces task data with the supplied data.
Copyright (C) 2012 by Dmitry E. Oboukhov <unera@debian.org> Copyright (C) 2012 by Roman V. Nikolaev <rshadow@rambler.ru>
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.8 or, at your option, any later version of Perl 5 you may have available.
To install DR::TarantoolQueue, copy and paste the appropriate command in to your terminal.
cpanm
cpanm DR::TarantoolQueue
CPAN shell
perl -MCPAN -e shell install DR::TarantoolQueue
For more information on module installation, please visit the detailed CPAN module installation guide.