NAME

AnyEvent::Beanstalk - Async client to talk to beanstalkd server

VERSION

version 1.170590

SYNOPSIS

  use AnyEvent::Beanstalk;

  my $client = AnyEvent::Beanstalk->new(
     server => "localhost",
  );

  # Send a job with explicit data
  my $job = $client->put(
    { data     => "data",
      priority => 100,
      ttr      => 120,
      delay    => 5,
    }
  )->recv;

  # Send job, data created by encoding @args. By default with YAML
  my $job2 = $client->put(
    { priority => 100,
      ttr      => 120,
      delay    => 5,
      encode   => \@args,
    }
  )->recv;

  # Send job, data created by encoding @args with JSON
  use JSON::XS;
  $client->encoder(\&JSON::XS::encode_json);
  my $job2 = $client->put(
    { priority => 100,
      ttr      => 120,
      delay    => 5,
      encode   => \@args,
    },
  )->recv;

  # fetch a job, with a callback
  $client->reserve( sub { my $job = shift; process_job($job) });

DESCRIPTION

AnyEvent::Beanstalk provides a Perl API of protocol version 1.3 to the beanstalkd server, a fast, general-purpose, in-memory workqueue service by Keith Rarick.

See the beanstalkd 1.3 protocol spec for greater detail

METHODS

Constructor

new (%options)

Any of the attributes with accessor methods described below may be passed to the constructor as key-value pairs

Attribute Accessor Methods

server ([$hostname])

Get/set the hostname, and port, to connect to. The port, which defaults to 11300, can be specified by appending it to the hostname with a : (eg "localhost:1234"). (Default: localhost:11300)

delay ([$delay])

Set/get a default value, in seconds, for job delay. A job with a delay will be placed into a delayed state and will not be placed into the ready queue until the time period has passed. This value will be used by put and release as a default. (Default: 0)

ttr ([$ttr])

Set/get a default value, in seconds, for job ttr (time to run). This value will be used by put as a default. (Default: 120)

priority ([$priority])

Set/get a default value for job priority. The highest priority job is the job where the priority value is the lowest (ie jobs with a lower priority value are run first). This value will be used by put, release and bury as a default. (Default: 10000)

encoder ([$encoder])

Set/get serialization encoder. $encoder is a reference to a subroutine that will be called when arguments to put need to be encoded to send to the beanstalkd server. The subroutine should accept a single argument and return a string representation to pass to the server. The default is to encode the argument using YAML

decoder ([$decoder])

Set/get the serialization decoder. $decoder is a reference to a subroutine that will be called when data from the beanstalkd server needs to be decoded. The subroutine will be passed the data fetched from the beanstalkd server and should return the value the application can use. The default is to decode using YAML.

debug ([$debug])

Set/get debug value. If set to a true value then all communication with the server will be output with warn

on_error ([$callback])

A code reference to call when there is an error communicating with the server, for example an enexpected EOF. A description will be passed as an argument. The default is to call die

on_connect ([$callback])

A code reference to call when the TCP connection has been established with the server

Communication Methods

All methods that communicate with the server take an optional code reference as the last argument and return a condition variable.

The condition variable recv method will return 2 values. The first is specific to the command that is being performed and is referred to below as the response value. The second value returned by recv is the first line of the protocol response.

If there is a protocol error the response value will be undef.

If a callback is specified then the callback will be called with the same arguments that the recv method would return.

Calling recv in a scalar context will only return the first of the two values.

If there is a communication error, then all condition variables will be triggered with no values.

Producer Methods

These methods are used by clients that are placing work into the queue

put ($options, [$callback])

Insert job into the currently used tube.

The response value for a put is a AnyEvent::Beanstalk::Job object.

Options may be

priority

priority to use to queue the job. Jobs with smaller priority values will be scheduled before jobs with larger priorities. The most urgent priority is 0

Defaults to the current value of the priority attribute

delay

An integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time

Defaults to the current value of the delay attribute

ttr

"time to run" - An integer number of seconds to allow a worker to run this job. This time is counted from the moment a worker reserves this job. If the worker does not delete, release, or bury the job within ttr seconds, the job will time out and the server will release the job. The minimum ttr is 1. If the client sends 0, the server will silently increase the ttr to 1.

Defaults to the current value of the ttr attribute

data

The job body. If not specified, the value of the encode option is used

encode

Value to encode and pass as job body, if data option is not passed.

Defaults to the empty string.

use ($tube, [$callback])

Change tube that new jobs are inserted into

The response value for use is a true value.

Worker Methods

reserve ([$timeout], [$callback])

Reserve a job from the list of tubes currently being watched.

Returns a AnyEvent::Beanstalk::Job on success. $timeout is the maximum number of seconds to wait for a job to become ready. If $timeout is not given then the client will wait indefinitely.

The response value for reserve is a AnyEvent::Beanstalk::Job object.

delete ($id, [$callback])

Delete the specified job.

The response value will be true.

release ($id, [$options], [$callback])

Release the specified job.

The response value for release is a AnyEvent::Beanstalk::Job object.

Valid options are

priority

New priority to assign to the job

delay

An integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time

bury ($id, [$options], [$callback])

The bury command puts a job into the "buried" state. Buried jobs are put into a FIFO linked list and will not be touched by the server again until a client kicks them with the "kick" command.

The response value for bury is a AnyEvent::Beanstalk::Job object.

Valid options are

priority

New priority to assign to the job

touch ($id, [$callback])

Calling touch with the id of a reserved job will reset the time left for the job to complete back to the original ttr value.

The response value for touch is a true value.

watch ($tube, [$callback])

Specifies a tube to add to the watch list. If the tube doesn't exist, it will be created

The response value for watch is the number of tubes being watched

ignore ($tube, [$callback])

Stop watching $tube

The response value for ignore is the number of tubes being watched

watch_only (@tubes, [$callback])

watch_only will submit a list_tubes_watching command then submit watch and ignore command to make the list match.

The response value for watch_only is the number of tubes being watched

Other Communication Methods

peek ($id, [$callback])

Peek at the job id specified.

The response value for peek is a AnyEvent::Beanstalk::Job object.

peek_ready ([$callback])

Peek at the first job that is in the ready queue of the tube currently being used.

The response value for peek_ready is a AnyEvent::Beanstalk::Job object.

peek_delayed ([$callback])

Peek at the first job that is in the delayed queue of the tubes currently being used.

The response value for peek_delayed is a AnyEvent::Beanstalk::Job object.

peek_buried ([$callback])

Peek at the first job that is in the buried queue of the tube currently being used.

The response value for peek_buried is a AnyEvent::Beanstalk::Job object.

kick ($bound, [$callback])

The kick command applies only to the currently used tube. It moves jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs. The server will not kick more than $bound jobs.

The response value is the number of jobs kicked

kick_job ($id, [$callback])

Kick the specified job $id.

stats_job ($id, [$callback])

Return stats for the specified job $id.

The response value for stats_job is a AnyEvent::Beanstalk::Stats object with the following methods.

  • id - The job id

  • tube - The name of the tube that contains this job

  • state - is "ready" or "delayed" or "reserved" or "buried"

  • pri - The priority value set by the put, release, or bury commands.

  • age - The time in seconds since the put command that created this job.

  • time_left - The number of seconds left until the server puts this job into the ready queue. This number is only meaningful if the job is reserved or delayed. If the job is reserved and this amount of time elapses before its state changes, it is considered to have timed out.

  • reserves - The number of times this job has been reserved

  • timeouts - The number of times this job has timed out during a reservation.

  • releases - The number of times a client has released this job from a reservation.

  • buries - The number of times this job has been buried.

  • kicks - The number of times this job has been kicked.

stats_tube ($tube, [$callback])

Return stats for the specified tube $tube.

The response value for stats_tube is a AnyEvent::Beanstalk::Stats object with the following methods.

  • name - The tube's name.

  • current_jobs_urgent - The number of ready jobs with priority < 1024 in this tube.

  • current_jobs_ready - The number of jobs in the ready queue in this tube.

  • current_jobs_reserved - The number of jobs reserved by all clients in this tube.

  • current_jobs_delayed - The number of delayed jobs in this tube.

  • current_jobs_buried - The number of buried jobs in this tube.

  • total_jobs - The cumulative count of jobs created in this tube.

  • current_waiting - The number of open connections that have issued a reserve command while watching this tube but not yet received a response.

  • pause - The number of seconds the tube has been paused for.

  • cmd_pause_tube - The cumulative number of pause-tube commands for this tube.

  • pause_time_left - The number of seconds until the tube is un-paused.

stats ([$callback])

The response value for stats is a AnyEvent::Beanstalk::Stats object with the following methods.

  • current_jobs_urgent - The number of ready jobs with priority < 1024.

  • current_jobs_ready - The number of jobs in the ready queue.

  • current_jobs_reserved - The number of jobs reserved by all clients.

  • current_jobs_delayed - The number of delayed jobs.

  • current_jobs_buried - The number of buried jobs.

  • cmd_put - The cumulative number of put commands.

  • cmd_peek - The cumulative number of peek commands.

  • cmd_peek_ready - The cumulative number of peek-ready commands.

  • cmd_peek_delayed - The cumulative number of peek-delayed commands.

  • cmd_peek_buried - The cumulative number of peek-buried commands.

  • cmd_reserve - The cumulative number of reserve commands.

  • cmd_use - The cumulative number of use commands.

  • cmd_watch - The cumulative number of watch commands.

  • cmd_ignore - The cumulative number of ignore commands.

  • cmd_delete - The cumulative number of delete commands.

  • cmd_release - The cumulative number of release commands.

  • cmd_bury - The cumulative number of bury commands.

  • cmd_kick - The cumulative number of kick commands.

  • cmd_stats - The cumulative number of stats commands.

  • cmd_stats_job - The cumulative number of stats-job commands.

  • cmd_stats_tube - The cumulative number of stats-tube commands.

  • cmd_list_tubes - The cumulative number of list-tubes commands.

  • cmd_list_tube_used - The cumulative number of list-tube-used commands.

  • cmd_list_tubes_watched - The cumulative number of list-tubes-watched commands.

  • cmd_pause_tube - The cumulative number of pause-tube commands

  • job_timeouts - The cumulative count of times a job has timed out.

  • total_jobs - The cumulative count of jobs created.

  • max_job_size - The maximum number of bytes in a job.

  • current_tubes - The number of currently-existing tubes.

  • current_connections - The number of currently open connections.

  • current_producers - The number of open connections that have each issued at least one put command.

  • current_workers - The number of open connections that have each issued at least one reserve command.

  • current_waiting - The number of open connections that have issued a reserve command but not yet received a response.

  • total_connections - The cumulative count of connections.

  • pid - The process id of the server.

  • version - The version string of the server.

  • rusage_utime - The accumulated user CPU time of this process in seconds and microseconds.

  • rusage_stime - The accumulated system CPU time of this process in seconds and microseconds.

  • uptime - The number of seconds since this server started running.

  • binlog_oldest_index - The index of the oldest binlog file needed to store the current jobs

  • binlog_current_index - The index of the current binlog file being written to. If binlog is not active this value will be 0

  • binlog_max_size - The maximum size in bytes a binlog file is allowed to get before a new binlog file is opened

list_tubes ([$callback])

The response value for list_tubes is a reference to an array of the tubes that the server currently has defined.

list_tube_used ([$callback])

The response value for list_tube_used is the name of the tune currently being used.

THis is the tube whichput would place new jobs and the tube which will be examined by the various peek commands.

list_tubes_watched ([$callback])

The response value for list_tubes_watched is a reference to an array of the tubes that this connection is currently watching

These are the tubes that reserve will check to find jobs. On error an empty list, or undef in a scalar context, will be returned.

pause_tube ($tube, $delay, [$callback])

Pause from reserving any jobs in $tube for $delay seconds.

The response value for pause_tube is a true value.

connect

Initiate a connection to the server. Once the connection is established, then on_connect handler will be called.

reconnect

Will connect to the server then attempt to restore the tube used and the list of tubes watched. If it is unable to restore the state, the connection will be disconnected and the on_error handler will be called

sync

Process all pending commands. Will return when there are no pending commands

reserve_pending

Returns the number of reserve commands that have been sent and not answered yet.

disconnect

Disconnect from server. If there are any outstanding commands then the condition variable for each command will be sent the empty list.

quit

Same as disconnect

TODO

More tests

ACKNOWLEDGEMENTS

Large parts of this documention were lifted from the documention that comes with beanstalkd

SEE ALSO

Beanstalk::Client, AnyEvent

http://kr.github.com/beanstalkd/
beanstalkd 1.3 protocol spec

AUTHOR

Graham Barr <gbarr@pobox.com>

CREDITS

Tatsuhiko Miyagawa

Much of the structure of the code in this module was based on AnyEvent::Redis

Keith Rarick

Author of beanstalkd

COPYRIGHT

Copyright (C) 2010 by Graham Barr.

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.