Queue::Q::ReliableFIFO::Redis - In-memory Redis implementation of the ReliableFIFO queue
use Queue::Q::ReliableFIFO::Redis; my $q = Queue::Q::ReliableFIFO::Redis->new( server => 'myredisserver', port => 6379, queue_name => 'my_work_queue', ); # reuse same connection and create a new object for another queue # Note: don't use the same connection in different threads (of course)! my $q2 = Queue::Q::ReliableFIFO::Redis->clone( $q, queue_name => 'other_queue' ); # Producer: $q->enqueue_item("foo"); # You can pass any JSON-serializable data structure $q->enqueue_item({ bar => "baz" }); $q->enqueue_item({ id=> 12},{id =>34}); # two items # get rid of everything in the queue: $q->flush_queue(); # get a clean state, removes queue # Consumer: $q->consume(\&callback); $q->consume( sub { my $data = shift; print 'Received: ', Dumper($data); }); # Cleanup script my $action = 'requeue'; while (1) { my @handled_items = $q->handle_expired_items($timeout, $action); for my $item (@handled_items) { printf "%s: item %s, in queue since %s, requeued % times\n", $action, Dumper($item->data), scalar localtime $item->time, $item->requeue_count; } sleep(60); } # retry items that failed before: $q->requeue_failed_items(); $q->requeue_failed_items( MaxFailCount => 3, # only requeue if there were not more than 3 failures Delay => 3600, # only requeue if the previous fail is at least 1 hour ago ); # Nagios? $q->queue_length(); $q->queue_length('failed'); # Depricated (consumer) my $item = $q->claim_item; my @items= $q->claim_item(100); my $foo = $item->data; $q->mark_item_as_done($item); # single item $q->mark_item_as_done(@items); # multiple items
Implements interface defined in Queue::Q::ReliableFIFO: an implementation based on Redis.
The data structures passed to enqueue_item are serialized using JSON (cf. JSON::XS), so any data structures supported by that can be enqueued. We use JSON because that is supported at the Lua side as well (the cjson library).
enqueue_item
The implementation is kept very lightweight at the Redis level in order to get a hight throughput. With this implementation it is easy to get a throughput of 10,000 items per second on a single core.
At the Redis side this is basically done at the following events:
Note that only exceptions need multiple commands.
To detect hanging items, a cronjob is needed, looking at how long items stay in the busy status.
The queues are implemented as list data structures in Redis. The lists ave as name the queue name plus an extension. The extension is:
_main for the working queue _busy for the list with items that are claimed but not finished _failed for the items that failed
There can also be a list with extension "_time" if a cronjob is monitoring how long items are in the busy list (see method handle_expired_items()).
Important note: At the Redis level a lost connection will always throw an exception, even if auto-reconnect is switched on. As consequence, the methods that do Redis commands, like enqueue_item(), claim_item() and mark_item_as_done(), will throw an exception when the connection to the Redis server is lost. The consume() method handles these exceptions. For other methods you need to catch and handle the exception.
enqueue_item()
claim_item()
mark_item_as_done()
consume()
All methods of Queue::Q::ReliableFIFO. Other methods are:
Constructor. Takes named parameters. Required parameters are
Optional parameters are
Default value is 0
Default value is 5
Default value is 1
handle_expired_items()
Default value is 30
warn_on_requeue
The clone method can be use to use the default (and existing connection) to create another queue object.
Special for the Redis imlementation is that the return value is the length of the queue after the items are added.
Attempts to claim $count items from the main queue and atomically transfers them to the busy queue. Returns the items as Queue::Q::ReliableFIFO::Item objects (as a list for $count > 1). $count defaults to 1. Will block for claim_wait_timeout seconds.
$count
Queue::Q::ReliableFIFO::Item
$count > 1
claim_wait_timeout
Same as claim_item, but non-blocking.
claim_item
This method can be used to obtain a simple count of the specified subqueue (i.e. main/busy/failed). Useful for monitoring checks, and also as a backpressure mechanism for throttling.
This method is called by the consumer to consume the items of a queue. For each item in the queue, the callback function will be called. The function will receive that data of the queued item as parameter. While the consume method deals with the queue related actions, like claiming, "marking as done" etc, the callback function only deals with processing the item.
The $action parameter is applied when the callback function returns a "die". Allowed values are:
$action
By default, the consume method will keep on reading the queue forever or until the process receives a SIGINT or SIGTERM signal. You can make the consume method return earlier by using one of the options MaxItems, MaxSeconds or ReturnWhenEmpty. If you still want to have a "near real time" behavior you need to make sure there are always consumers running, which can be achieved using cron and IPC::ConcurrencyLimit::WithStandby.
MaxItems
MaxSeconds
ReturnWhenEmpty
IPC::ConcurrencyLimit::WithStandby
This method also uses claim_wait_timeout.
requeue. (Default). I.e. do it again, the item will be put at the tail of the queue. The requeue_limit property is the queue indicates the limit to how many times an item can be requeued. The default is 5 times. You can change that by setting by calling the set_requeue_limit() method or by passing the property to the constructor. When the requeue limit is reached, the item will go to the failed queue.
Default
set_requeue_limit()
Note: by setting the queue_limit to "0" you can force the item to go to the "failed" status right away (without being requeued).
drop. Forget about it.
Chunk. The Chunk option is used to set a chunk size for number of items to claim and to mark as done in one go. This helps to fight latency.
DieOnError. DEPRECATED. See ReturnOnDie.
ReturnOnDie
ReturnOnDie If this option has a true value, the consumer will stop if the callback function does a die call. Default is "false".
die
WarnOnError If this option has a true value, the consumer will warn if the callback function dies. Default is "false".
MaxItems This can be used to limit the consume method to process only a limited amount of items, which can be useful in cases of memory leaks. When you use this option, you will probably need to look into restarting strategies with cron. Of course this comes with delays in handling the items.
MaxSeconds This can be used to limit the consume method to process items for a limited amount of time.
ReturnWhenEmpty Use this when you want to let consume() return when the queue is empty. Note that comsume will wait for claim_wait_timeout seconds until it can come to the conclusion that the queue is empty.
Pause. This can be used to give the queue some time to grow, so that more items at the time can be claimed. The value of Pause is in seconds and can be a fraction of a second (e.g. 0.5). Default value is 0. This option only makes sense if larger Chunks are read from the queue (so together with option "Chunk").
ProcessAll. This can be used to process all claimed items by one invocation of the callback function. The value of ProcessAll should be a true or false value. Default value is "0". Note that this changes the @_ content of callback: normally the callback function is called with one item data structure, while in this case @_ will contain an array with item data structures. This option only makes sense if larger Chunks are read from the queue (so together with option "Chunk").
NoSigHandlers When this option is used, no signal handlers for SIGINT and SIGTERM will be installed. By default, consume() installs handlers that will make the queue consuming stop on reception of those signals.
Examples:
$q->consume(\&callback,); $q->consume(\&callback, 'requeue'); # same, because 'requeue' is default # takes 50 items a time. Faster because less latency $q->consume(\&callback, 'requeue', { Chunk => 50 });
This method can be used by a cleanup job to ensure that items don't stick forever in the busy status. When an item has been in this status for $timeout seconds, the action specified by the $action will be done. The $action parameter is the same as with the consume() method.
The method returns item objects of type Queue::Q::ReliableFIFO::Item which has the item data as well as the time when it was queued at the first time, how often it was requeued.
To set/change the limit of how often an item can be requeued, use the requeue_limit parameter in the new() constructor or use the method set_requeue_limit.
Once an item is moved to the failed queue, the counter is reset. The item can be put back into the main queue by using the requeue_failed_items() method (or via the CLI). Then it will be retried again up to requeue_limit times.
This method puts the items that are passed, back to the queue at the consumer side, so that they can be picked up a.s.a.p. This method is e.g. be used when a chunk of items are claimed but the consumer aborts before all items are processed.
This puts items that are claimed back to the queue so that other consumers can pick this up. In this case the items are put at the back of the queue, so depending the queue length it can take some time before it is available for consumers.
This method will move items from the failed queue to the working queue. The %options can be used to restrict what should be requeued. The number of items actually moved will be the return value.
MaxFailCount Takes only the items with not more than MaxFailCount failures. Value "-1" means "regardless how many times it fails". Default: -1
Delay Takes only the items that failed at least Delay seconds ago. Default: 0.
Chunk Performance related: amount of items to handle in one lua call. Higher values will result in higher throughput, but more stress on Redis. Default: 100.
An item will only be requeued if both criteria (MaxFailCount and Delay are met.
NOTE: Previous versions supported a single "limit" parameter to requeue only a few items. This API is still supported but may go away in the future.
** deprecated ** This method can stress Redis really hard when the queue is very long. If you want to requeue items, use requeue_failed_items() instead.
This method will move the specified item(s) to the main queue so that it will be processed again. It will return 0 or 1 (i.e. the number of items moved).
Same as requeue_busy, it only accepts one value instead of a list.
** deprecated ** Use remove_failed_items() instead.
This method is now using remove_failed_items() under the hood. The default values for Chunk and Loglimit are used. Because ot the Loglimit, there is a maximum of the amount it failed items this method will return. So if it returns 100, it is possible that the actual amount is higher.
Typical use could be a cronjob that warns about failed items (e.g. via email) and cleans them up.
Supported options:
MaxAge Only the failed items that are older then $seconds will be retrieved and removed.
MinFailCount Takes only the items with have at least MaxFailCount failures. Default: 0
If both options are used, only one of the two needs to be true to retrieve and remove an item.
This method will remove the items that are considered as failing permanently, according the the criteria passed via the options.
Returns array ($n_removed, \@raw_items) where $n_removed indicates how many items are removed from the failed queue. The @raw_items will contain objects of the type Queue::Q::Reliable::Item which failed permanently. The number of objects is the lowest number of $n_removed and of the LogLimit option (see below).
The way this method works is moving failed items to a temporary list in Redis and process that sequentially with a server side lua script. This lua script is called repeatedly and will handle up to "Chunk" number of items in each call. Depending the criteria items will be put back in the failed queue or not. When the temporary queue is empty, the lua script is not longer called. The items that are not put back in the queue are put in a "log" list.
Usually you will want to know the details of the failed items. But if there many (e.g. millions) it is not likely you will read the details. That is the reason there is a LogLimit option.
MinAge Takes only the items older than MinAge seconds. Default: 0.
LogLimit The maximum number of raw items (with messages) you want to get back after one call of this function. Default: 100
If one or two of the MinAge and MinFailCount related criteria are true, the item is considered as permanently failed.
This methods returns maximum wait time of items in the queue. This method will simply lookup the item in the head of the queue (i.e. at the consumer side of the queue) and will return the age of that item. So this is a relatively cheap method.
Returns objects of type Queue::Q::ReliableFIFO::Item from the busy list. You can limit the number of items by passing the limit to the method.
If you require a simple count, and not the actual queue items themselves, consider using the method queue_length. This avoids the overhead of deserialising each queue item by calling Redis's LLEN command instead.
queue_length
LLEN
Similar to raw_items_busy() but for failed items.
Similar to raw_items_busy() but for items in the working queue. Note that the main queue can be large, so a limit is strongly recommended here.
Returns the memory usage percentage of the Redis instance where the queue is located.
Returns value of oldest item in the queue (about to be consumed), without removing the item from the queue.
Herald van der Breggen, <herald.vanderbreggen@booking.com>
Steffen Mueller, <smueller@cpan.org>
Copyright (C) 2012, 2013, 2014 by Steffen Mueller
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.1 or, at your option, any later version of Perl 5 you may have available.
To install Queue::Q, copy and paste the appropriate command in to your terminal.
cpanm
cpanm Queue::Q
CPAN shell
perl -MCPAN -e shell install Queue::Q
For more information on module installation, please visit the detailed CPAN module installation guide.