@@ -2,12 +2,12 @@ package RedisDB;
use strict;
use warnings;
-our $VERSION = "2.30";
+our $VERSION = "2.36";
$VERSION = eval $VERSION;
use RedisDB::Error;
use RedisDB::Parser;
-use IO::Socket::INET;
+use IO::Socket::IP;
use IO::Socket::UNIX;
use Socket qw(MSG_DONTWAIT MSG_NOSIGNAL SO_RCVTIMEO SO_SNDTIMEO);
use POSIX qw(:errno_h);
@@ -32,8 +32,7 @@ RedisDB - Perl extension to access redis database
This module provides interface to access redis key-value store, it
transparently handles disconnects and forks, supports transactions,
-pipelining, and subscription mode. Module includes XS and pure Perl
-versions of the parser.
+pipelining, and subscription mode.
=head1 METHODS
@@ -41,7 +40,7 @@ versions of the parser.
=head2 $class->new(%options)
-Creates the new RedisDB object. The following options are accepted:
+Creates a new RedisDB object. The following options are accepted:
=over 4
@@ -61,7 +60,7 @@ L</host> and L</port> you should specify I<path>.
=item password
Password, if redis server requires authentication. Alternatively you can use
-I<auth> method after connection.
+I<auth> method after creating the object.
=item database
@@ -73,18 +72,18 @@ Default value is 0.
=item raise_error
By default if redis-server returned error reply, or there was a connection
-error I<get_reply> method throws an exception of L<RedisDB::Error> type, if
-you set this parameter to false it will return an error object instead. Note,
-that if you have set this to false you must always check if the result you've
-got from RedisDB is a L<RedisDB::Error> object.
+error I<get_reply> method throws an exception of L<RedisDB::Error> type, if you
+set this parameter to false it will return an error object instead. Note, that
+if you set this to false you should always check if the result you've got from
+RedisDB method is a L<RedisDB::Error> object.
=item timeout
-IO timeout. With this option set, if IO operation has taken more than specified
-number of seconds, module will croak or return L<RedisDB::Error::EAGAIN> error
-object depending on L</raise_error> setting. Note, that some OSes do not
-support SO_RCVTIMEO, and SO_SNDTIMEO socket options, in this case timeout will
-not work.
+IO timeout. With this option set, if I/O operation has taken more than
+specified number of seconds, module will croak or return
+L<RedisDB::Error::EAGAIN> error object depending on L</raise_error> setting.
+Note, that some OSes do not support SO_RCVTIMEO, and SO_SNDTIMEO socket
+options, in this case timeout will not work.
=item utf8
@@ -94,39 +93,39 @@ from UTF-8. See L</"UTF-8 SUPPORT">.
=item connection_name
-After establishing connection set its name. See "CLIENT SETNAME" command
-description in the redis documentation.
+After establishing a connection set its name to the specified using "CLIENT
+SETNAME" command.
=item lazy
-by default I<new> establishes connection to the server. If this parameter is
-set, then connection will be established when you will send a command to the
-server.
+by default I<new> establishes a connection to the server. If this parameter is
+set, then connection will be established only when you will send first command
+to the server.
=item reconnect_attempts
this parameter allows you to specify how many attempts to (re)connect to the
-server should be made before returning error. Default value is 1, set to -1 if
-module should try to reconnect indefinitely.
+server should be made before returning an error. Default value is 1, set to -1
+if module should try to reconnect indefinitely.
=item reconnect_delay_max
-module makes a delay before each new attempt to connect. Delay increases with
-each new attempt. This parameter allows you to specify maximum delay between
-attempts to reconnect. Default value is 10.
+module waits some time before every new attempt to connect. Delay increases
+each time. This parameter allows you to specify maximum delay between attempts
+to reconnect. Default value is 10.
=item on_connect_error
-this allows you to specify callback that will be invoked if module could not
-establish connection to the server. First argument to callback is a reference
-to the RedisDB object, and second is the error description. You must not invoke
-any methods on the object inside the callback, but you can change I<port> and
-I<host>, or I<path> attributes of the I<RedisDB> object to point to another
-server. After callback returned, module tries to establish connection again
-using new parameters. To prevent further connection attempts callback should
-throw an exception. Default callback confesses. This may be useful to switch to
-backup server if primary went down. RedisDB distribution includes an example of
-using this callback in eg/server_failover.pl.
+if module failed to establish connection with the server it will invoke this
+callback. First argument to the callback is a reference to the RedisDB object,
+and second is the error description. You must not invoke any methods on the
+object inside the callback, but you can change I<port> and I<host>, or I<path>
+attributes of the I<RedisDB> object to point to another server. After callback
+returned, module tries to establish connection again using new parameters. To
+prevent further connection attempts callback should throw an exception, which
+is done by default callback. This may be useful to switch to backup server if
+primary went down. RedisDB distribution includes an example of using this
+callback in eg/server_failover.pl.
=back
@@ -164,10 +163,11 @@ sub _init_parser {
=head2 $self->execute($command, @arguments)
-send a command to the server and return the result. It will throw an exception
-if the server returns an error or return L<RedisDB::Error> depending on
-L</raise_error> parameter. It may be more convenient to use instead of this
-method wrapper named after the corresponding redis command. E.g.:
+send a command to the server, wait for the result and return it. It will throw
+an exception if the server returns an error or return L<RedisDB::Error> object
+depending on L</raise_error> parameter. It may be more convenient to use
+instead of this method wrapper named after the corresponding redis command.
+E.g.:
$redis->execute('set', key => 'value');
# is the same as
@@ -175,9 +175,9 @@ method wrapper named after the corresponding redis command. E.g.:
See L</"WRAPPER METHODS"> section for the full list of defined aliases.
-Note, that you can't use I<execute> if you have sent some commands using
-I<send_command> method without callback argument and have not yet got all
-replies.
+Note, that you can not use I<execute> if you have sent some commands using
+I<send_command> method without the I<callback> argument and have not yet got
+all replies.
=cut
@@ -205,7 +205,7 @@ sub _on_disconnect {
if ($err) {
$error_obj ||= RedisDB::Error::DISCONNECTED->new(
"Server unexpectedly closed connection. Some data might have been lost.");
- if ( $self->{raise_error} || $self->{_in_multi} ) {
+ if ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) {
$self->reset_connection;
die $error_obj;
}
@@ -272,7 +272,7 @@ sub _connect {
my $delay;
while ( not $self->{_socket} and $attempts ) {
sleep $delay if $delay;
- $self->{_socket} = IO::Socket::INET->new(
+ $self->{_socket} = IO::Socket::IP->new(
PeerAddr => $self->{host},
PeerPort => $self->{port},
Proto => 'tcp',
@@ -413,7 +413,7 @@ sub _recv_data_nb {
else {
delete $self->{_socket};
- if ( $self->{_parser}->callbacks or $self->{_in_multi} ) {
+ if ( $self->{_parser}->callbacks or $self->{_in_multi} or $self->{_watching} ) {
# there are some replies lost
$self->_on_disconnect(1);
@@ -446,13 +446,14 @@ sub _queue {
send a command to the server. If send has failed command will die or return
L<RedisDB::Error> object depending on L<raise_error> parameter. Note, that it
-does not return reply from the server, you should retrieve it using the
-I<get_reply> method, or if I<callback> has been specified, it will be invoked
-upon receiving the reply with two arguments: the RedisDB object, and the reply
-from the server. If the server returns an error, the second argument to the
-callback will be a L<RedisDB::Error> object, you can get description of the
-error using this object in string context. If you are not interested in reply,
-you can use RedisDB::IGNORE_REPLY constant as the last argument.
+does not return reply from the server, if I<callback> was not specified, you
+should retrieve result using I<get_reply> method, otherwise I<callback> will
+be invoked upon receiving the result with two arguments: the RedisDB object,
+and the reply from the server. If the server returned an error, the second
+argument to the callback will be a L<RedisDB::Error> object, you can get
+description of the error using this object in string context. If you are not
+interested in reply, you can use RedisDB::IGNORE_REPLY constant as the last
+argument.
Note, that RedisDB does not run any background threads, so it will not receive
the reply and invoke the callback unless you call some of its methods which
@@ -585,10 +586,9 @@ sub send_command_cb {
=head2 $self->reply_ready
-This method may be used in the pipelining mode to check if there are some
-replies already received from the server. Returns the number of replies
-available for reading. May also return L<RedisDB::Error> object if there was a
-network error and I<raise_error> is disabled.
+this method may be used in the pipelining mode to check if there are some
+replies already received from the server. Returns true if there are replies
+ready to be fetched with I<get_reply> method.
=cut
@@ -649,9 +649,9 @@ sub mainloop {
=head2 $self->get_reply
-Receive and return reply from the server. If the server returned an error,
+receive and return reply from the server. If the server returned an error,
method throws L<RedisDB::Error> exception or returns L<RedisDB::Error> object,
-depending on I<raise_error> parameter, see I<new>.
+depending on the L</raise_error> parameter.
=cut
@@ -690,19 +690,38 @@ sub get_reply {
my $res = shift @{ $self->{_replies} };
if ( ref $res eq 'RedisDB::Error'
- and ( $self->{raise_error} or $self->{_in_multi} ) )
+ and ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) )
{
croak $res;
}
+ if ( $self->{_subscription_loop} ) {
+ confess "Expected multi-bulk reply, but got $res" unless ref $res;
+ if ( $res->[0] eq 'message' ) {
+ $self->{_subscribed}{ $res->[1] }( $self, $res->[1], undef, $res->[2] )
+ if $self->{_subscribed}{ $res->[1] };
+ }
+ elsif ( $res->[0] eq 'pmessage' ) {
+ $self->{_psubscribed}{ $res->[1] }( $self, $res->[2], $res->[1], $res->[3] )
+ if $self->{_psubscribed}{ $res->[1] };
+ }
+ elsif ( $res->[0] =~ /^p?(un)?subscribe/ ) {
+
+ # ignore
+ }
+ else {
+ confess "Got unknown reply $res->[0] in subscription mode";
+ }
+ }
+
return $res;
}
=head2 $self->get_all_replies
-Wait till replies to all the commands without callback set will be received.
-Return a list of replies to these commands. For commands with callback set
-replies are processed as usual. Unlike I<mainloop> this method block only till
+wait till replies to all the commands without callback set will be received.
+Returns a list of replies to these commands. For commands with callback set
+replies are processed as usual. Unlike I<mainloop> this method blocks only till
replies to all commands for which callback was NOT set will be received.
=cut
@@ -718,7 +737,7 @@ sub get_all_replies {
=head2 $self->replies_to_fetch
-Return the number of commands sent to the server replies to which wasn't yet
+return the number of commands sent to the server replies to which were not yet
retrieved with I<get_reply> or I<get_all_replies>. This number only includes
commands for which callback was not set.
@@ -731,7 +750,7 @@ sub replies_to_fetch {
=head2 $self->selected_database
-Get currently selected database.
+get currently selected database.
=cut
@@ -741,9 +760,9 @@ sub selected_database {
=head2 $self->reset_connection
-Resets connection. This closes existing connection and drops all previously
-sent requests. After invoking this method the object returns to the same state
-as it was returned by the constructor.
+reset connection. This method closes existing connection and drops all
+previously sent requests. After invoking this method the object returns to the
+same state as it was returned by the constructor.
=cut
@@ -758,7 +777,7 @@ sub reset_connection {
=head2 $self->version
-Return the version of the server the client is connected to. The version is
+return the version of the server the client is connected to. The version is
returned as a floating point number represented the same way as the perl
versions. E.g. for redis 2.1.12 it will return 2.001012.
@@ -777,23 +796,23 @@ sub version {
my @commands = qw(
append auth bgrewriteaof bgsave bitcount bitop bitpos
blpop brpop brpoplpush client client_kill client_getname client_setname
- config config_get config_set config_resetstat config_rewrite
- dbsize debug_object debug_segfault
+ config config_get config_set config_resetstat config_rewrite
+ dbsize debug_error debug_object debug_segfault
decr decrby del dump echo eval evalsha exists expire expireat flushall
flushdb get getbit getrange getset hdel hexists hget hgetall
hincrby hincrbyfloat hkeys hlen hmget hscan hmset hset hsetnx hvals incr incrby
incrbyfloat keys lastsave lindex linsert llen lpop lpush lpushx
lrange lrem lset ltrim mget migrate move mset msetnx object object_refcount
- object_encoding object_idletime persist pexpire pexpireat ping psetex pttl
+ object_encoding object_idletime persist pexpire pexpireat pfadd pfcount pfmerge ping psetex pttl
pubsub pubsub_channels pubsub_numsub pubsub_numpat
publish quit randomkey rename renamenx restore rpop rpoplpush
rpush rpushx sadd save scan scard script script_exists script_flush script_kill
script_load sdiff sdiffstore select set
setbit setex setnx setrange sinter sinterstore
sismember slaveof slowlog smembers smove sort spop srandmember
- srem sscan strlen sunion sunionstore sync time ttl type unwatch watch
- zadd zcard
- zcount zincrby zinterstore zrange zrangebyscore zrank zrem
+ srem sscan strlen sunion sunionstore sync time ttl type
+ zadd zcard zcount zincrby zinterstore zlexcount zrange zrangebylex
+ zrangebyscore zrank zrem zremrangebylex
zremrangebyrank zremrangebyscore zrevrange zrevrangebyscore zrevrank
zscan zscore zunionstore
);
@@ -822,22 +841,23 @@ I<execute>, waits for the reply from the server, and returns it. E.g.:
$redis->send_command("get", $key, sub { $val = $_[1] });
The following wrapper methods are defined: append, auth, bgrewriteaof, bgsave,
-bitcount, bitop, bitpos, blpop, brpop, brpoplpush, client, client_kill, client_getname,
-client_setname, config, config_get, config_set, config_resetstat, config_rewrite, dbsize,
-debug_object, debug_segfault, decr, decrby, del, dump, echo, eval, evalsha,
-exists, expire, expireat, flushall, flushdb, get, getbit, getrange, getset,
-hdel, hexists, hget, hgetall, hincrby, hincrbyfloat, hkeys, hlen, hmget, hscan, hmset,
-hset, hsetnx, hvals, incr, incrby, incrbyfloat, keys, lastsave, lindex,
-linsert, llen, lpop, lpush, lpushx, lrange, lrem, lset, ltrim, mget, migrate,
-move, mset, msetnx, object, object_refcount, object_encoding, object_idletime,
-persist, pexpire, pexpireat, ping, psetex, pttl, publish,
-pubsub, pubsub_channels, pubsub_numsub, pubsub_numpat, quit, randomkey,
-rename, renamenx, restore, rpop, rpoplpush, rpush, rpushx, sadd, save, scan, scard,
-script, script_exists, script_flush, script_kill, script_load, sdiff,
-sdiffstore, select, set, setbit, setex, setnx, setrange, sinter, sinterstore,
-sismember, slaveof, slowlog, smembers, smove, sort, spop, srandmember, srem,
-sscan strlen, sunion, sunionstore, sync, time, ttl, type, unwatch, watch, zadd,
-zcard, zcount, zincrby, zinterstore, zrange, zrangebyscore, zrank, zrem,
+bitcount, bitop, bitpos, blpop, brpop, brpoplpush, client, client_kill,
+client_getname, client_setname, config, config_get, config_set,
+config_resetstat, config_rewrite, dbsize, debug_error, debug_object, debug_segfault, decr,
+decrby, del, dump, echo, eval, evalsha, exists, expire, expireat, flushall,
+flushdb, get, getbit, getrange, getset, hdel, hexists, hget, hgetall, hincrby,
+hincrbyfloat, hkeys, hlen, hmget, hscan, hmset, hset, hsetnx, hvals, incr,
+incrby, incrbyfloat, keys, lastsave, lindex, linsert, llen, lpop, lpush,
+lpushx, lrange, lrem, lset, ltrim, mget, migrate, move, mset, msetnx, object,
+object_refcount, object_encoding, object_idletime, persist, pexpire, pexpireat,
+pfadd, pfcount, pfmerge, ping, psetex, pttl, publish, pubsub, pubsub_channels, pubsub_numsub,
+pubsub_numpat, quit, randomkey, rename, renamenx, restore, rpop, rpoplpush,
+rpush, rpushx, sadd, save, scan, scard, script, script_exists, script_flush,
+script_kill, script_load, sdiff, sdiffstore, select, set, setbit, setex, setnx,
+setrange, sinter, sinterstore, sismember, slaveof, slowlog, smembers, smove,
+sort, spop, srandmember, srem, sscan strlen, sunion, sunionstore, sync, time,
+ttl, type, unwatch, watch, zadd, zcard, zcount, zincrby, zinterstore,
+zlexcount, zrange, zrangebylex, zrangebyscore, zrank, zrem, zremrangebylex,
zremrangebyrank, zremrangebyscore, zrevrange, zrevrangebyscore, zrevrank,
zscan, zscore, zunionstore.
@@ -866,7 +886,7 @@ The following commands implement some additional postprocessing of the results:
=cut
-=head2 $self->info([$callback])
+=head2 $self->info([\&callback])
return information and statistics about the server. Redis-server returns
information in form of I<field:value>, the I<info> method parses result and
@@ -897,7 +917,7 @@ sub _parse_info {
return \%info;
}
-=head2 $self->client_list([$callback])
+=head2 $self->client_list([\&callback])
return list of clients connected to the server. This method parses server
output and returns result as reference to array of hashes.
@@ -1065,7 +1085,11 @@ L<RedisDB::Error::DISCONNECTED> class, next time you use the object it will
establish a new connection. If L</raise_error> disabled, the module will pass
L<RedisDB::Error::DISCONNECTED> object to all outstanding callbacks and will
try to reconnect to the server; it will also automatically restore
-subscriptions if object was in subscription mode.
+subscriptions if object was in subscription mode. Module never tries to
+reconnect after MULTI or WATCH command was sent to server and before
+corresponding UNWATCH, EXEC or DISCARD was sent as this may cause data
+corruption, so during transaction module behaves like if L</raise_error> is
+set.
Module makes several attempts to reconnect each time increasing interval before
the next attempt, depending on the values of L</reconnect_attempts> and
@@ -1075,7 +1099,7 @@ hostname, so on next attempt module will try to connect to different server.
=cut
-=head1 PIPELINING SUPPORT
+=head1 PIPELINING
You can send commands in the pipelining mode. It means you are sending multiple
commands to the server without waiting for the replies. This is implemented by
@@ -1136,47 +1160,74 @@ or using L</"WRAPPER METHODS"> you can rewrite it as:
=cut
-=head1 SUBSCRIPTIONS SUPPORT
+=head1 PUB/SUB MESSAGING
RedisDB supports subscriptions to redis channels. In the subscription mode you
can subscribe to some channels and receive all the messages sent to these
-channels. Every time RedisDB receives a message for the channel it invokes a
-callback provided by the user. User can specify different callbacks for the
-different channels. When in the subscription mode you can subscribe to
-additional channels, or unsubscribe from the channels you subscribed to, but
-you can't use any other redis commands like set, get, etc. Here is the example
-of running in the subscription mode:
+channels. You can subscribe to channels and then manually check messages using
+I<get_reply> method, or you can invoke I<subscription_loop> method, which will
+block in loop waiting for messages and invoking callback for each received
+message. In the first case you can use I<subscribe> and I<psubscribe> methods
+to subscribe to channels and then you can use I<get_reply> method to get
+messages from the channel:
+
+ $redis->subscribe(
+ foo => sub {
+ my ( $redis, $channel, $patern, $message ) = @_;
+ print "Foo: $message\n";
+ }
+ );
+ # Wait for messages
+ $res = $redis->get_reply;
+
+I<get_reply> method for messages from the channel will invoke callback
+specified as the second optional argument of the I<subscribe> method and will
+also return raw replies from the server, both for messages from the channels
+and for informational messages from the redis server. If you do not want to
+block in I<get_reply> method, you can check if there are any messages using
+I<reply_ready> method.
+
+In the second case you invoke I<subscription_loop> method, it subscribes to
+specified channels and waits for messages, when a message arrived it invokes
+callback defined for the channel from which the message came. Here is an
+example:
my $message_cb = sub {
- my ($redis, $channel, $pattern, $message) = @_;
+ my ( $redis, $channel, $pattern, $message ) = @_;
print "$channel: $message\n";
};
-
+
my $control_cb = sub {
- my ($redis, $channel, $pattern, $message) = @_;
- if ($channel eq 'control.quit') {
+ my ( $redis, $channel, $pattern, $message ) = @_;
+ if ( $channel eq 'control.quit' ) {
$redis->unsubscribe;
$redis->punsubscribe;
}
- elsif ($channel eq 'control.subscribe') {
+ elsif ( $channel eq 'control.subscribe' ) {
$redis->subscribe($message);
}
};
-
- subscription_loop(
- subscribe => [ 'news', ],
- psubscribe => [ 'control.*' => $control_cb ],
+
+ $redis->subscription_loop(
+ subscribe => [ 'news', ],
+ psubscribe => [ 'control.*' => $control_cb ],
default_callback => $message_cb,
);
-subscription_loop will subscribe you to the news channel and control.*
+subscription_loop will subscribe you to the "news" channel and "control.*"
channels. It will call specified callbacks every time a new message received.
-You can subscribe to additional channels sending their names to the
-control.subscribe channel. You can unsubscribe from all the channels by sending
-a message to the control.quit channel. Every callback receives four arguments:
-the RedisDB object, the channel for which the message was received, the pattern
-if you subscribed to this channel using I<psubscribe> method, and the message
-itself.
+When message came from "control.subscribe" channel, callback subscribes to an
+additional channel. When message came from "control.quit" channel, callback
+unsubscribes from all channels.
+
+Callbacks used in subscription mode receive four arguments: the RedisDB object,
+the channel from which the message came, the pattern if you subscribed to this
+channel using I<psubscribe> method, and the message itself.
+
+Once you switched into subscription mode using either I<subscribe> or
+I<psubscribe> command, or by entering I<subscription_loop>, you only can send
+I<subscribe>, I<psubscribe>, I<unsubscribe>, and I<punsubscribe> commands to
+the server, other commands will throw an exception.
You can publish messages into the channels using the I<publish> method. This
method should be called when you in the normal mode, and can't be used while
@@ -1225,11 +1276,11 @@ for every channel you are going to subscribe.
sub subscription_loop {
my ( $self, %args ) = @_;
- croak "Already in subscription loop" if $self->{_subscription_loop};
+ croak "Already in subscription loop" if $self->{_subscription_loop} > 0;
croak "You can't start subscription loop while in pipelining mode."
if $self->replies_to_fetch;
- $self->{_subscribed} = {};
- $self->{_psubscribed} = {};
+ $self->{_subscribed} ||= {};
+ $self->{_psubscribed} ||= {};
$self->{_subscription_cb} = $args{default_callback};
$self->{_subscription_loop} = 1;
$self->{_parser}->set_default_callback( \&_queue );
@@ -1252,32 +1303,16 @@ sub subscription_loop {
unless ( keys %{ $self->{_subscribed} } or keys %{ $self->{_psubscribed} } );
while ( $self->{_subscription_loop} ) {
- my $msg = $self->get_reply;
- confess "Expected multi-bulk reply, but got $msg" unless ref $msg;
- if ( $msg->[0] eq 'message' ) {
- $self->{_subscribed}{ $msg->[1] }( $self, $msg->[1], undef, $msg->[2] )
- if $self->{_subscribed}{ $msg->[1] };
- }
- elsif ( $msg->[0] eq 'pmessage' ) {
- $self->{_psubscribed}{ $msg->[1] }( $self, $msg->[2], $msg->[1], $msg->[3] )
- if $self->{_psubscribed}{ $msg->[1] };
- }
- elsif ( $msg->[0] =~ /^p?(un)?subscribe/ ) {
-
- # ignore
- }
- else {
- confess "Got unknown reply $msg->[0] in subscription mode";
- }
+ $self->get_reply;
}
return;
}
-=head2 $self->subscribe($channel[, $callback])
+=head2 $self->subscribe($channel[, \&callback])
Subscribe to the I<$channel>. If I<$callback> is not specified, default
-callback will be used. If you are invoking I<subscribe> outside of subscription
-loop, I<$callback> is ignored.
+callback will be used in subscription loop, or messages will be returned by
+I<get_reply> if you are not using subscription loop.
=cut
@@ -1294,18 +1329,18 @@ sub subscribe {
or croak "Callback for $channel not specified, neither default callback defined";
}
else {
- $callback = 1;
+ $callback ||= sub { 1 };
}
$self->{_subscribed}{$channel} = $callback;
- $self->send_command( "SUBSCRIBE", $channel );
+ $self->send_command( "SUBSCRIBE", $channel, \&_queue );
return;
}
-=head2 $self->psubscribe($pattern[, $callback])
+=head2 $self->psubscribe($pattern[, \&callback])
Subscribe to channels matching I<$pattern>. If I<$callback> is not specified,
-default callback will be used. If you are invoking I<psubscribe> outside of
-subscription loop, I<$callback> is ignored.
+default callback will be used in subscription loop, or messages will be
+returned by I<get_reply> if you are not using subscription loop.
=cut
@@ -1322,10 +1357,10 @@ sub psubscribe {
or croak "Callback for $channel not specified, neither default callback defined";
}
else {
- $callback = 1;
+ $callback ||= sub { 1 };
}
$self->{_psubscribed}{$channel} = $callback;
- $self->send_command( "PSUBSCRIBE", $channel );
+ $self->send_command( "PSUBSCRIBE", $channel, \&_queue );
return;
}
@@ -1405,17 +1440,17 @@ sub psubscribed {
return keys %{ shift->{_psubscribed} };
}
-=head1 TRANSACTIONS SUPPORT
+=head1 TRANSACTIONS
Transactions allow you to execute a sequence of commands in a single step. In
order to start a transaction you should use the I<multi> method. After you
have entered a transaction all the commands you issue are queued, but not
executed till you call the I<exec> method. Typically these commands return
string "QUEUED" as a result, but if there is an error in e.g. number of
-arguments, they may croak. When you call exec, all the queued commands will be
-executed and exec will return a list of results for every command in the
-transaction. If instead of I<exec> you call I<discard>, all scheduled commands
-will be canceled.
+arguments, they may return an error. When you call exec, all the queued
+commands will be executed and exec will return a list of results for every
+command in the transaction. If instead of I<exec> you call I<discard>, all
+scheduled commands will be canceled.
You can set some keys as watched. If any watched key has been changed by
another client before you called exec, the transaction will be discarded and
@@ -1423,7 +1458,45 @@ exec will return false value.
=cut
-=head2 $self->multi
+=head2 $self->watch(@keys[, \&callback])
+
+mark given keys to be watched
+
+=cut
+
+sub watch {
+ my $self = shift;
+
+ $self->{_watching} = 1;
+ if ( ref $_[-1] eq 'CODE' ) {
+ return $self->send_command( 'WATCH', @_ );
+ }
+ else {
+ return $self->execute( 'WATCH', @_ );
+ }
+}
+
+=head2 $self->unwatch([\&callback])
+
+unwatch all keys
+
+=cut
+
+sub unwatch {
+ my $self = shift;
+
+ my $res;
+ if ( ref $_[-1] eq 'CODE' ) {
+ $res = $self->send_command( 'UNWATCH', @_ );
+ }
+ else {
+ $res = $self->execute( 'UNWATCH', @_ );
+ }
+ $self->{_watching} = undef;
+ return $res;
+}
+
+=head2 $self->multi([\&callback])
Enter the transaction. After this and till I<exec> or I<discard> will be called,
all the commands will be queued but not executed.
@@ -1434,12 +1507,16 @@ sub multi {
my $self = shift;
die "Multi calls can not be nested!" if $self->{_in_multi};
- my $res = $self->execute('MULTI');
$self->{_in_multi} = 1;
- return $res;
+ if ( ref $_[-1] eq 'CODE' ) {
+ return $self->send_command( 'MULTI', @_ );
+ }
+ else {
+ return $self->execute('MULTI');
+ }
}
-=head2 $self->exec
+=head2 $self->exec([\&callback])
Execute all queued commands and finish the transaction. Returns a list of
results for every command. Will croak if some command has failed. Also
@@ -1451,12 +1528,19 @@ client, the transaction will be canceled and I<exec> will return false.
sub exec {
my $self = shift;
- my $res = $self->execute('EXEC');
+ my $res;
+ if ( ref $_[-1] eq 'CODE' ) {
+ $res = $self->send_command( 'EXEC', @_ );
+ }
+ else {
+ $res = $self->execute('EXEC');
+ }
$self->{_in_multi} = undef;
+ $self->{_watching} = undef;
return $res;
}
-=head2 $self->discard
+=head2 $self->discard([\&callback])
Discard all queued commands without executing them and unwatch all keys.
@@ -1465,8 +1549,15 @@ Discard all queued commands without executing them and unwatch all keys.
sub discard {
my $self = shift;
- my $res = $self->execute('DISCARD');
+ my $res;
+ if ( ref $_[-1] eq 'CODE' ) {
+ $res = $self->send_command( 'DISCARD', @_ );
+ }
+ else {
+ $res = $self->execute('DISCARD');
+ }
$self->{_in_multi} = undef;
+ $self->{_watching} = undef;
return $res;
}