The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

Queue API description

A Tarantool/Box instance can serve as a Queue Manager, along with any other database work necessary.

A single properly configured Tarantool/Box space can store any number of queues. Multiple spaces can be used as well - for partitioning or logical separation of queues.

Queues support task priority. Priority value lays in the range [0, 255], with default value being 127. A higher value means higher priority, lower value - lower priority.

Each queue has one (currently) associated fiber taking care of it. The fiber is started upon first access to the queue. The job of the fiber is to monitor orphaned tasks, as well as prune and clean the queue from obsolete tasks.

To configure a space supporting queues, use the following parameters:

cfg space = [ { enabled = 1, index = [ { type = "TREE", unique = 1, key_field = [ { fieldno = 0, type = "STR" } ] }, { type = "TREE", unique = 0, key_field = [ { fieldno = 1, # queue, aka tube type = "STR" }, { fieldno = 2, # status type = "STR" }, { fieldno = 4, # ipri type = "STR" }, { fieldno = 5 # pri type = "STR" } ] }, { type = "TREE", unique = 0, key_field = [ { fieldno = 3, # next_event type = "NUM64" } ] } ] } ]

It may also be desirable to tune server readahead configuration variable if many consumers re-use the same socket for getting and acknowledging tasks.

The recommended value can be calculated as:

consumers-per-socket * (256 + largest task size)

For example, if the largest task size is 256 bytes, and average number of consumers per socket is 10, the recommended readahead value must be at least 51 200 bytes.

Terminology

Arguments of queue API functions

Task states

The format of task tuple

Queue API functions, such as put, take, return a task. The task consists of the following fields:

  1. id (string) - task identifier
  2. tube (string) - queue identifier
  3. status (string) - task status
  4. task data (all fields passed into put/urgent when the task was created)

API

Producer

queue.put(space, tube, delay, ttl, ttr, pri, ...)

Enqueue a task. Returns a tuple, representing the new task. The list of fields with task data ('...')is optional.

queue.urgent(space, tube, delay, ttl, ttr, pri, ...)

Enqueue a task. The task will get the highest priority. If delay is not zero, the function is equivalent to put.

Consumer

queue.take(space, tube, timeout)

If there are tasks in the queue ready for execution, take the highest-priority task. Otherwise, wait for a ready task to appear in the queue, and, as soon as it appears, mark it as taken and return to the consumer. If there is a timeout, and the task doesn't appear until the timeout expires, return 'nil'. If timeout is not given, wait indefinitely until a task appears.

All the time while the consumer is working on a task, it must keep the connection to the server open. If a connection disappears while the consumer is still working on a task, the task is put back on the ready list.

queue.ack(space, id)

Confirm completion of a task. Before marking a task as complete, this function verifies that:

Consumer identity is established using a session identifier. In other words, the task must be confirmed by the same connection which took it. If verification fails, the function returns an error.

On success, delete the task from the queue.

queue.release(space, id [, delay [, ttl ] ])

Return a task back to the queue: the task is not executed. Additionally, a new time to live and re-execution delay can be provided. If ttl is not defined (or zero) the method won't prolong ttl.

queue.requeue(space, id)

Return a task to the queue, the task is not executed. Puts the task at the end of the queue, so that it's executed only after all existing tasks in the queue are executed.

queue.bury(space, id)

Mark a task as buried. This special status excludes the task from the active list, until it's dug up. This function is useful when several attempts to execute a task lead to a failure. Buried tasks can be monitored by the queue owner, and treated specially.

queue.done(space, id, ...)

Mark a task as complete (done), but don't delete it. Replaces task data with the supplied fields ('...').

Common functions (neither producer nor consumer).

queue.dig(space, id)

'Dig up' a buried task, after checking that the task is buried. The task status is changed to ready.

queue.kick(space, tube [, count] )

'Dig up' count tasks in a queue. If count is not given, digs up just one buried task.

queue.unbury(space, id)

An alias to dig.

queue.delete(space, id)

Delete a task from the queue (regardless of task state or status).

queue.meta(space, id)

Return taks metadata:

  1. id (string) - task id
  2. tube (string) - queue id
  3. status (string) - task status
  4. event (time64) - time of the next important event in task life time, for example, when ttl or ttr expires, in seconds since start of the UNIX epoch
  5. ipri (string) - internal value of the task priority
  6. pri (string) - task priority as set when the task was added to the queue
  7. cid (number) - consumer id, of the consumer which took the task (only if the task is taken)
  8. created (time64) - time when the task was created (seconds since start of the UNIX epoch).
  9. ttl (time64) - task time to live
  10. ttr (time64) - task time to run
  11. cbury (count) - how many times the task was buried
  12. ctaken (Ч) - how many times the task was taken
  13. now (time64) - time recorded when the meta was called

queue.peek(space, id)

Return a task by task id. Returned tuple has the following fields:

  1. id (string) - task identifier
  2. tube (string) - queue identifier
  3. status (string) - task status
  4. task data (all fields passed into put/urgent when the task was created).

queue.statistics()

Return queue module statistics accumulated since server start. The statistics is broken down by queue id. Only queues on which there was some activity are included in the output.

The format of the statistics is a sequence of rows, where each odd row is the name of a statistical parameter, and the next even row is the value.