AnyEvent::Beanstalk - Async client to talk to beanstalkd server
version 1.142520
use AnyEvent::Beanstalk; my $client = AnyEvent::Beanstalk->new( server => "localhost", ); # Send a job with explicit data my $job = $client->put( { data => "data", priority => 100, ttr => 120, delay => 5, } )->recv; # Send job, data created by encoding @args. By default with YAML my $job2 = $client->put( { priority => 100, ttr => 120, delay => 5, encode => \@args, } )->recv; # Send job, data created by encoding @args with JSON use JSON::XS; $client->encoder(\&JSON::XS::encode_json); my $job2 = $client->put( { priority => 100, ttr => 120, delay => 5, encode => \@args, }, )->recv; # fetch a job, with a callback $client->reserve( sub { my $job = shift; process_job($job) });
AnyEvent::Beanstalk provides a Perl API of protocol version 1.3 to the beanstalkd server, a fast, general-purpose, in-memory workqueue service by Keith Rarick.
See the beanstalkd 1.3 protocol spec for greater detail
Any of the attributes with accessor methods described below may be passed to the constructor as key-value pairs
Get/set the hostname, and port, to connect to. The port, which defaults to 11300, can be specified by appending it to the hostname with a : (eg "localhost:1234"). (Default: localhost:11300)
:
"localhost:1234"
localhost:11300
Set/get a default value, in seconds, for job delay. A job with a delay will be placed into a delayed state and will not be placed into the ready queue until the time period has passed. This value will be used by put and release as a default. (Default: 0)
put
release
Set/get a default value, in seconds, for job ttr (time to run). This value will be used by put as a default. (Default: 120)
Set/get a default value for job priority. The highest priority job is the job where the priority value is the lowest (ie jobs with a lower priority value are run first). This value will be used by put, release and bury as a default. (Default: 10000)
bury
Set/get serialization encoder. $encoder is a reference to a subroutine that will be called when arguments to put need to be encoded to send to the beanstalkd server. The subroutine should accept a single argument and return a string representation to pass to the server. The default is to encode the argument using YAML
$encoder
Set/get the serialization decoder. $decoder is a reference to a subroutine that will be called when data from the beanstalkd server needs to be decoded. The subroutine will be passed the data fetched from the beanstalkd server and should return the value the application can use. The default is to decode using YAML.
$decoder
Set/get debug value. If set to a true value then all communication with the server will be output with warn
warn
A code reference to call when there is an error communicating with the server, for example an enexpected EOF. A description will be passed as an argument. The default is to call die
A code reference to call when the TCP connection has been established with the server
All methods that communicate with the server take an optional code reference as the last argument and return a condition variable.
The condition variable recv method will return 2 values. The first is specific to the command that is being performed and is referred to below as the response value. The second value returned by recv is the first line of the protocol response.
recv
If there is a protocol error the response value will be undef.
undef
If a callback is specified then the callback will be called with the same arguments that the recv method would return.
Calling recv in a scalar context will only return the first of the two values.
If there is a communication error, then all condition variables will be triggered with no values.
These methods are used by clients that are placing work into the queue
Insert job into the currently used tube.
The response value for a put is a AnyEvent::Beanstalk::Job object.
Options may be
priority to use to queue the job. Jobs with smaller priority values will be scheduled before jobs with larger priorities. The most urgent priority is 0
Defaults to the current value of the priority attribute
An integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time
Defaults to the current value of the delay attribute
"time to run" - An integer number of seconds to allow a worker to run this job. This time is counted from the moment a worker reserves this job. If the worker does not delete, release, or bury the job within ttr seconds, the job will time out and the server will release the job. The minimum ttr is 1. If the client sends 0, the server will silently increase the ttr to 1.
ttr
Defaults to the current value of the ttr attribute
The job body. If not specified, the value of the encode option is used
encode
Value to encode and pass as job body, if data option is not passed.
data
Defaults to the empty string.
Change tube that new jobs are inserted into
The response value for use is a true value.
use
Reserve a job from the list of tubes currently being watched.
Returns a AnyEvent::Beanstalk::Job on success. $timeout is the maximum number of seconds to wait for a job to become ready. If $timeout is not given then the client will wait indefinitely.
$timeout
The response value for reserve is a AnyEvent::Beanstalk::Job object.
reserve
Delete the specified job.
The response value will be true.
Release the specified job.
The response value for release is a AnyEvent::Beanstalk::Job object.
Valid options are
New priority to assign to the job
The bury command puts a job into the "buried" state. Buried jobs are put into a FIFO linked list and will not be touched by the server again until a client kicks them with the "kick" command.
The response value for bury is a AnyEvent::Beanstalk::Job object.
Calling touch with the id of a reserved job will reset the time left for the job to complete back to the original ttr value.
touch
The response value for touch is a true value.
Specifies a tube to add to the watch list. If the tube doesn't exist, it will be created
The response value for watch is the number of tubes being watched
watch
Stop watching $tube
$tube
The response value for ignore is the number of tubes being watched
ignore
watch_only will submit a list_tubes_watching command then submit watch and ignore command to make the list match.
watch_only
list_tubes_watching
The response value for watch_only is the number of tubes being watched
Peek at the job id specified.
The response value for peek is a AnyEvent::Beanstalk::Job object.
peek
Peek at the first job that is in the ready queue of the tube currently being used.
The response value for peek_ready is a AnyEvent::Beanstalk::Job object.
peek_ready
Peek at the first job that is in the delayed queue of the tubes currently being used.
The response value for peek_delayed is a AnyEvent::Beanstalk::Job object.
peek_delayed
Peek at the first job that is in the buried queue of the tube currently being used.
The response value for peek_buried is a AnyEvent::Beanstalk::Job object.
peek_buried
The kick command applies only to the currently used tube. It moves jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs. The server will not kick more than $bound jobs.
$bound
The response value is the number of jobs kicked
Kick the specified job $id.
$id
Return stats for the specified job $id.
The response value for stats_job is a AnyEvent::Beanstalk::Stats object with the following methods.
stats_job
id - The job id
tube - The name of the tube that contains this job
state - is "ready" or "delayed" or "reserved" or "buried"
pri - The priority value set by the put, release, or bury commands.
age - The time in seconds since the put command that created this job.
time_left - The number of seconds left until the server puts this job into the ready queue. This number is only meaningful if the job is reserved or delayed. If the job is reserved and this amount of time elapses before its state changes, it is considered to have timed out.
reserves - The number of times this job has been reserved
timeouts - The number of times this job has timed out during a reservation.
releases - The number of times a client has released this job from a reservation.
buries - The number of times this job has been buried.
kicks - The number of times this job has been kicked.
Return stats for the specified tube $tube.
The response value for stats_tube is a AnyEvent::Beanstalk::Stats object with the following methods.
stats_tube
name - The tube's name.
current_jobs_urgent - The number of ready jobs with priority < 1024 in this tube.
current_jobs_ready - The number of jobs in the ready queue in this tube.
current_jobs_reserved - The number of jobs reserved by all clients in this tube.
current_jobs_delayed - The number of delayed jobs in this tube.
current_jobs_buried - The number of buried jobs in this tube.
total_jobs - The cumulative count of jobs created in this tube.
current_waiting - The number of open connections that have issued a reserve command while watching this tube but not yet received a response.
pause - The number of seconds the tube has been paused for.
cmd_pause_tube - The cumulative number of pause-tube commands for this tube.
pause_time_left - The number of seconds until the tube is un-paused.
The response value for stats is a AnyEvent::Beanstalk::Stats object with the following methods.
stats
current_jobs_urgent - The number of ready jobs with priority < 1024.
current_jobs_ready - The number of jobs in the ready queue.
current_jobs_reserved - The number of jobs reserved by all clients.
current_jobs_delayed - The number of delayed jobs.
current_jobs_buried - The number of buried jobs.
cmd_put - The cumulative number of put commands.
cmd_peek - The cumulative number of peek commands.
cmd_peek_ready - The cumulative number of peek-ready commands.
cmd_peek_delayed - The cumulative number of peek-delayed commands.
cmd_peek_buried - The cumulative number of peek-buried commands.
cmd_reserve - The cumulative number of reserve commands.
cmd_use - The cumulative number of use commands.
cmd_watch - The cumulative number of watch commands.
cmd_ignore - The cumulative number of ignore commands.
cmd_delete - The cumulative number of delete commands.
cmd_release - The cumulative number of release commands.
cmd_bury - The cumulative number of bury commands.
cmd_kick - The cumulative number of kick commands.
cmd_stats - The cumulative number of stats commands.
cmd_stats_job - The cumulative number of stats-job commands.
cmd_stats_tube - The cumulative number of stats-tube commands.
cmd_list_tubes - The cumulative number of list-tubes commands.
cmd_list_tube_used - The cumulative number of list-tube-used commands.
cmd_list_tubes_watched - The cumulative number of list-tubes-watched commands.
cmd_pause_tube - The cumulative number of pause-tube commands
job_timeouts - The cumulative count of times a job has timed out.
total_jobs - The cumulative count of jobs created.
max_job_size - The maximum number of bytes in a job.
current_tubes - The number of currently-existing tubes.
current_connections - The number of currently open connections.
current_producers - The number of open connections that have each issued at least one put command.
current_workers - The number of open connections that have each issued at least one reserve command.
current_waiting - The number of open connections that have issued a reserve command but not yet received a response.
total_connections - The cumulative count of connections.
pid - The process id of the server.
version - The version string of the server.
rusage_utime - The accumulated user CPU time of this process in seconds and microseconds.
rusage_stime - The accumulated system CPU time of this process in seconds and microseconds.
uptime - The number of seconds since this server started running.
binlog_oldest_index - The index of the oldest binlog file needed to store the current jobs
binlog_current_index - The index of the current binlog file being written to. If binlog is not active this value will be 0
binlog_max_size - The maximum size in bytes a binlog file is allowed to get before a new binlog file is opened
The response value for list_tubes is a reference to an array of the tubes that the server currently has defined.
list_tubes
The response value for list_tube_used is the name of the tune currently being used.
list_tube_used
THis is the tube whichput would place new jobs and the tube which will be examined by the various peek commands.
The response value for list_tubes_watched is a reference to an array of the tubes that this connection is currently watching
list_tubes_watched
These are the tubes that reserve will check to find jobs. On error an empty list, or undef in a scalar context, will be returned.
Pause from reserving any jobs in $tube for $delay seconds.
$delay
The response value for pause_tube is a true value.
pause_tube
Initiate a connection to the server. Once the connection is established, then on_connect handler will be called.
on_connect
Will connect to the server then attempt to restore the tube used and the list of tubes watched. If it is unable to restore the state, the connection will be disconnected and the on_error handler will be called
on_error
Process all pending commands. Will return when there are no pending commands
Returns the number of reserve commands that have been sent and not answered yet.
Disconnect from server. If there are any outstanding commands then the condition variable for each command will be sent the empty list.
Same as disconnect
disconnect
More tests
Large parts of this documention were lifted from the documention that comes with beanstalkd
Beanstalk::Client, AnyEvent
Graham Barr <gbarr@pobox.com>
Much of the structure of the code in this module was based on AnyEvent::Redis
Author of beanstalkd
Copyright (C) 2010 by Graham Barr.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
To install AnyEvent::Beanstalk, copy and paste the appropriate command in to your terminal.
cpanm
cpanm AnyEvent::Beanstalk
CPAN shell
perl -MCPAN -e shell install AnyEvent::Beanstalk
For more information on module installation, please visit the detailed CPAN module installation guide.