The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
Changes 019
META.json 23
META.yml 1415
Makefile.PL 01
lib/RedisDB/Error.pm 11
lib/RedisDB.pm 159250
t/auth.t 11
t/network.t 1237
t/no-leak.t 11
t/redis_commands.t 733
t/restore_subscriptions.t 11
t/subscribe.t 3867
t/transactions.t 879
xt/podspell.t 01
14 files changed (This is a version diff) 244509
@@ -1,5 +1,24 @@
 Revision history for RedisDB
 
+2.36 Fri May  9 2014 Pavel Shaydo <zwon@cpan.org>
+    - do not reconnect when watching some keys
+
+2.35 Wed May  7 2014 Pavel Shaydo <zwon@cpan.org>
+    - documentation fixes and improvements
+    - allow perform transactions in pipelining mode
+    - fix test failing if IPv6 is not supported
+
+2.33 Thu May  1 2014 Pavel Shaydo <zwon@cpan.org>
+    - fixed randomly failing auth.t test.
+      Reported by @scripter-v (Vadim Vlasov), see #19.
+    - added debug_error method
+
+2.32 Sat Apr 26 2014 Pavel Shaydo <zwon@cpan.org>
+    - add IPv6 support
+    - improve subscriptions handling outside of subscription loop
+    - update documentation for subscription mode
+    - add new commands introduced in redis 2.8.9
+
 2.30 Mon Mar 10 2014 Pavel Shaydo <zwon@cpan.org>
     - fix problem with unsubscribe if there are no psubscriptions.
       See #18, reported by @cameronpm
@@ -4,7 +4,7 @@
       "Pavel Shaydo <zwon@cpan.org>"
    ],
    "dynamic_config" : 1,
-   "generated_by" : "ExtUtils::MakeMaker version 6.86, CPAN::Meta::Converter version 2.133380",
+   "generated_by" : "ExtUtils::MakeMaker version 6.96, CPAN::Meta::Converter version 2.140640",
    "keywords" : [
       "redis"
    ],
@@ -41,6 +41,7 @@
       "runtime" : {
          "requires" : {
             "Encode" : "2.1",
+            "IO::Socket::IP" : "0",
             "RedisDB::Parser" : "2.2",
             "Try::Tiny" : "0",
             "perl" : "5.008004"
@@ -60,7 +61,7 @@
          "url" : "git://github.com/trinitum/RedisDB"
       }
    },
-   "version" : "2.30",
+   "version" : "2.36",
    "x_contributors" : [
       "Pavel Shaydo <zwon@cpan.org>",
       "HIROSE Masaaki <hirose31@gmail.com>",
@@ -3,38 +3,39 @@ abstract: 'Perl extension to access redis database'
 author:
   - 'Pavel Shaydo <zwon@cpan.org>'
 build_requires:
-  Digest::SHA: 0
-  Test::Differences: 0.61
-  Test::FailWarnings: 0
-  Test::More: 0.96
-  Test::Most: 0.22
-  Test::TCP: 1.17
+  Digest::SHA: '0'
+  Test::Differences: '0.61'
+  Test::FailWarnings: '0'
+  Test::More: '0.96'
+  Test::Most: '0.22'
+  Test::TCP: '1.17'
 configure_requires:
-  ExtUtils::MakeMaker: 6.3002
+  ExtUtils::MakeMaker: '6.3002'
 dynamic_config: 1
-generated_by: 'ExtUtils::MakeMaker version 6.86, CPAN::Meta::Converter version 2.133380'
+generated_by: 'ExtUtils::MakeMaker version 6.96, CPAN::Meta::Converter version 2.140640'
 keywords:
   - redis
 license: perl
 meta-spec:
   url: http://module-build.sourceforge.net/META-spec-v1.4.html
-  version: 1.4
+  version: '1.4'
 name: RedisDB
 no_index:
   directory:
     - t
     - inc
 requires:
-  Encode: 2.1
-  RedisDB::Parser: 2.2
-  Try::Tiny: 0
-  perl: 5.008004
+  Encode: '2.1'
+  IO::Socket::IP: '0'
+  RedisDB::Parser: '2.2'
+  Try::Tiny: '0'
+  perl: '5.008004'
 resources:
   bugtracker: https://github.com/trinitum/RedisDB/issues
   homepage: https://github.com/trinitum/RedisDB
   license: http://dev.perl.org/licenses/
   repository: git://github.com/trinitum/RedisDB
-version: 2.30
+version: '2.36'
 x_contributors:
   - 'Pavel Shaydo <zwon@cpan.org>'
   - 'HIROSE Masaaki <hirose31@gmail.com>'
@@ -17,6 +17,7 @@ WriteMakefile(
     PREREQ_PM     => {
         'Try::Tiny'       => 0,
         Encode            => 2.10,
+        'IO::Socket::IP'  => 0,
         'RedisDB::Parser' => 2.20,
     },
     CONFIGURE_REQUIRES => {
@@ -2,7 +2,7 @@ package RedisDB::Error;
 
 use strict;
 use warnings;
-our $VERSION = "2.30";
+our $VERSION = "2.36";
 $VERSION = eval $VERSION;
 
 =head1 NAME
@@ -2,12 +2,12 @@ package RedisDB;
 
 use strict;
 use warnings;
-our $VERSION = "2.30";
+our $VERSION = "2.36";
 $VERSION = eval $VERSION;
 
 use RedisDB::Error;
 use RedisDB::Parser;
-use IO::Socket::INET;
+use IO::Socket::IP;
 use IO::Socket::UNIX;
 use Socket qw(MSG_DONTWAIT MSG_NOSIGNAL SO_RCVTIMEO SO_SNDTIMEO);
 use POSIX qw(:errno_h);
@@ -32,8 +32,7 @@ RedisDB - Perl extension to access redis database
 
 This module provides interface to access redis key-value store, it
 transparently handles disconnects and forks, supports transactions,
-pipelining, and subscription mode. Module includes XS and pure Perl
-versions of the parser.
+pipelining, and subscription mode.
 
 =head1 METHODS
 
@@ -41,7 +40,7 @@ versions of the parser.
 
 =head2 $class->new(%options)
 
-Creates the new RedisDB object. The following options are accepted:
+Creates a new RedisDB object. The following options are accepted:
 
 =over 4
 
@@ -61,7 +60,7 @@ L</host> and L</port> you should specify I<path>.
 =item password
 
 Password, if redis server requires authentication. Alternatively you can use
-I<auth> method after connection.
+I<auth> method after creating the object.
 
 =item database
 
@@ -73,18 +72,18 @@ Default value is 0.
 =item raise_error
 
 By default if redis-server returned error reply, or there was a connection
-error I<get_reply> method throws an exception of L<RedisDB::Error> type, if
-you set this parameter to false it will return an error object instead. Note,
-that if you have set this to false you must always check if the result you've
-got from RedisDB is a L<RedisDB::Error> object.
+error I<get_reply> method throws an exception of L<RedisDB::Error> type, if you
+set this parameter to false it will return an error object instead. Note, that
+if you set this to false you should always check if the result you've got from
+RedisDB method is a L<RedisDB::Error> object.
 
 =item timeout
 
-IO timeout. With this option set, if IO operation has taken more than specified
-number of seconds, module will croak or return L<RedisDB::Error::EAGAIN> error
-object depending on L</raise_error> setting. Note, that some OSes do not
-support SO_RCVTIMEO, and SO_SNDTIMEO socket options, in this case timeout will
-not work.
+IO timeout. With this option set, if I/O operation has taken more than
+specified number of seconds, module will croak or return
+L<RedisDB::Error::EAGAIN> error object depending on L</raise_error> setting.
+Note, that some OSes do not support SO_RCVTIMEO, and SO_SNDTIMEO socket
+options, in this case timeout will not work.
 
 =item utf8
 
@@ -94,39 +93,39 @@ from UTF-8. See L</"UTF-8 SUPPORT">.
 
 =item connection_name
 
-After establishing connection set its name. See "CLIENT SETNAME" command
-description in the redis documentation.
+After establishing a connection set its name to the specified using "CLIENT
+SETNAME" command.
 
 =item lazy
 
-by default I<new> establishes connection to the server. If this parameter is
-set, then connection will be established when you will send a command to the
-server.
+by default I<new> establishes a connection to the server. If this parameter is
+set, then connection will be established only when you will send first command
+to the server.
 
 =item reconnect_attempts
 
 this parameter allows you to specify how many attempts to (re)connect to the
-server should be made before returning error. Default value is 1, set to -1 if
-module should try to reconnect indefinitely.
+server should be made before returning an error. Default value is 1, set to -1
+if module should try to reconnect indefinitely.
 
 =item reconnect_delay_max
 
-module makes a delay before each new attempt to connect. Delay increases with
-each new attempt. This parameter allows you to specify maximum delay between
-attempts to reconnect. Default value is 10.
+module waits some time before every new attempt to connect. Delay increases
+each time. This parameter allows you to specify maximum delay between attempts
+to reconnect. Default value is 10.
 
 =item on_connect_error
 
-this allows you to specify callback that will be invoked if module could not
-establish connection to the server.  First argument to callback is a reference
-to the RedisDB object, and second is the error description. You must not invoke
-any methods on the object inside the callback, but you can change I<port> and
-I<host>, or I<path> attributes of the I<RedisDB> object to point to another
-server.  After callback returned, module tries to establish connection again
-using new parameters. To prevent further connection attempts callback should
-throw an exception. Default callback confesses. This may be useful to switch to
-backup server if primary went down. RedisDB distribution includes an example of
-using this callback in eg/server_failover.pl.
+if module failed to establish connection with the server it will invoke this
+callback.  First argument to the callback is a reference to the RedisDB object,
+and second is the error description. You must not invoke any methods on the
+object inside the callback, but you can change I<port> and I<host>, or I<path>
+attributes of the I<RedisDB> object to point to another server.  After callback
+returned, module tries to establish connection again using new parameters. To
+prevent further connection attempts callback should throw an exception, which
+is done by default callback. This may be useful to switch to backup server if
+primary went down. RedisDB distribution includes an example of using this
+callback in eg/server_failover.pl.
 
 =back
 
@@ -164,10 +163,11 @@ sub _init_parser {
 
 =head2 $self->execute($command, @arguments)
 
-send a command to the server and return the result. It will throw an exception
-if the server returns an error or return L<RedisDB::Error> depending on
-L</raise_error> parameter. It may be more convenient to use instead of this
-method wrapper named after the corresponding redis command. E.g.:
+send a command to the server, wait for the result and return it. It will throw
+an exception if the server returns an error or return L<RedisDB::Error> object
+depending on L</raise_error> parameter. It may be more convenient to use
+instead of this method wrapper named after the corresponding redis command.
+E.g.:
 
     $redis->execute('set', key => 'value');
     # is the same as
@@ -175,9 +175,9 @@ method wrapper named after the corresponding redis command. E.g.:
 
 See L</"WRAPPER METHODS"> section for the full list of defined aliases.
 
-Note, that you can't use I<execute> if you have sent some commands using
-I<send_command> method without callback argument and have not yet got all
-replies.
+Note, that you can not use I<execute> if you have sent some commands using
+I<send_command> method without the I<callback> argument and have not yet got
+all replies.
 
 =cut
 
@@ -205,7 +205,7 @@ sub _on_disconnect {
     if ($err) {
         $error_obj ||= RedisDB::Error::DISCONNECTED->new(
             "Server unexpectedly closed connection. Some data might have been lost.");
-        if ( $self->{raise_error} || $self->{_in_multi} ) {
+        if ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) {
             $self->reset_connection;
             die $error_obj;
         }
@@ -272,7 +272,7 @@ sub _connect {
             my $delay;
             while ( not $self->{_socket} and $attempts ) {
                 sleep $delay if $delay;
-                $self->{_socket} = IO::Socket::INET->new(
+                $self->{_socket} = IO::Socket::IP->new(
                     PeerAddr => $self->{host},
                     PeerPort => $self->{port},
                     Proto    => 'tcp',
@@ -413,7 +413,7 @@ sub _recv_data_nb {
         else {
             delete $self->{_socket};
 
-            if ( $self->{_parser}->callbacks or $self->{_in_multi} ) {
+            if ( $self->{_parser}->callbacks or $self->{_in_multi} or $self->{_watching} ) {
 
                 # there are some replies lost
                 $self->_on_disconnect(1);
@@ -446,13 +446,14 @@ sub _queue {
 
 send a command to the server. If send has failed command will die or return
 L<RedisDB::Error> object depending on L<raise_error> parameter.  Note, that it
-does not return reply from the server, you should retrieve it using the
-I<get_reply> method, or if I<callback> has been specified, it will be invoked
-upon receiving the reply with two arguments: the RedisDB object, and the reply
-from the server.  If the server returns an error, the second argument to the
-callback will be a L<RedisDB::Error> object, you can get description of the
-error using this object in string context.  If you are not interested in reply,
-you can use RedisDB::IGNORE_REPLY constant as the last argument.
+does not return reply from the server, if I<callback> was not specified, you
+should retrieve result using I<get_reply> method, otherwise I<callback>  will
+be invoked upon receiving the result with two arguments: the RedisDB object,
+and the reply from the server. If the server returned an error, the second
+argument to the callback will be a L<RedisDB::Error> object, you can get
+description of the error using this object in string context.  If you are not
+interested in reply, you can use RedisDB::IGNORE_REPLY constant as the last
+argument.
 
 Note, that RedisDB does not run any background threads, so it will not receive
 the reply and invoke the callback unless you call some of its methods which
@@ -585,10 +586,9 @@ sub send_command_cb {
 
 =head2 $self->reply_ready
 
-This method may be used in the pipelining mode to check if there are some
-replies already received from the server. Returns the number of replies
-available for reading. May also return L<RedisDB::Error> object if there was a
-network error and I<raise_error> is disabled.
+this method may be used in the pipelining mode to check if there are some
+replies already received from the server. Returns true if there are replies
+ready to be fetched with I<get_reply> method.
 
 =cut
 
@@ -649,9 +649,9 @@ sub mainloop {
 
 =head2 $self->get_reply
 
-Receive and return reply from the server. If the server returned an error,
+receive and return reply from the server. If the server returned an error,
 method throws L<RedisDB::Error> exception or returns L<RedisDB::Error> object,
-depending on I<raise_error> parameter, see I<new>.
+depending on the L</raise_error> parameter.
 
 =cut
 
@@ -690,19 +690,38 @@ sub get_reply {
 
     my $res = shift @{ $self->{_replies} };
     if ( ref $res eq 'RedisDB::Error'
-        and ( $self->{raise_error} or $self->{_in_multi} ) )
+        and ( $self->{raise_error} or $self->{_in_multi} or $self->{_watching} ) )
     {
         croak $res;
     }
 
+    if ( $self->{_subscription_loop} ) {
+        confess "Expected multi-bulk reply, but got $res" unless ref $res;
+        if ( $res->[0] eq 'message' ) {
+            $self->{_subscribed}{ $res->[1] }( $self, $res->[1], undef, $res->[2] )
+              if $self->{_subscribed}{ $res->[1] };
+        }
+        elsif ( $res->[0] eq 'pmessage' ) {
+            $self->{_psubscribed}{ $res->[1] }( $self, $res->[2], $res->[1], $res->[3] )
+              if $self->{_psubscribed}{ $res->[1] };
+        }
+        elsif ( $res->[0] =~ /^p?(un)?subscribe/ ) {
+
+            # ignore
+        }
+        else {
+            confess "Got unknown reply $res->[0] in subscription mode";
+        }
+    }
+
     return $res;
 }
 
 =head2 $self->get_all_replies
 
-Wait till replies to all the commands without callback set will be received.
-Return a list of replies to these commands. For commands with callback set
-replies are processed as usual. Unlike I<mainloop> this method block only till
+wait till replies to all the commands without callback set will be received.
+Returns a list of replies to these commands. For commands with callback set
+replies are processed as usual. Unlike I<mainloop> this method blocks only till
 replies to all commands for which callback was NOT set will be received.
 
 =cut
@@ -718,7 +737,7 @@ sub get_all_replies {
 
 =head2 $self->replies_to_fetch
 
-Return the number of commands sent to the server replies to which wasn't yet
+return the number of commands sent to the server replies to which were not yet
 retrieved with I<get_reply> or I<get_all_replies>. This number only includes
 commands for which callback was not set.
 
@@ -731,7 +750,7 @@ sub replies_to_fetch {
 
 =head2 $self->selected_database
 
-Get currently selected database.
+get currently selected database.
 
 =cut
 
@@ -741,9 +760,9 @@ sub selected_database {
 
 =head2 $self->reset_connection
 
-Resets connection. This closes existing connection and drops all previously
-sent requests. After invoking this method the object returns to the same state
-as it was returned by the constructor.
+reset connection. This method closes existing connection and drops all
+previously sent requests. After invoking this method the object returns to the
+same state as it was returned by the constructor.
 
 =cut
 
@@ -758,7 +777,7 @@ sub reset_connection {
 
 =head2 $self->version
 
-Return the version of the server the client is connected to. The version is
+return the version of the server the client is connected to. The version is
 returned as a floating point number represented the same way as the perl
 versions. E.g. for redis 2.1.12 it will return 2.001012.
 
@@ -777,23 +796,23 @@ sub version {
 my @commands = qw(
   append	auth	bgrewriteaof	bgsave	bitcount	bitop	bitpos
   blpop	brpop   brpoplpush	client	client_kill	client_getname	client_setname
-  config	config_get	config_set	config_resetstat config_rewrite
-  dbsize	debug_object	debug_segfault
+  config	config_get	config_set	config_resetstat	config_rewrite
+  dbsize	debug_error	debug_object	debug_segfault
   decr	decrby	del	dump	echo	eval    evalsha exists	expire	expireat	flushall
   flushdb	get	getbit	getrange	getset	hdel	hexists	hget	hgetall
   hincrby	hincrbyfloat	hkeys	hlen	hmget	hscan	hmset	hset	hsetnx	hvals	incr	incrby
   incrbyfloat	keys	lastsave	lindex	linsert	llen	lpop	lpush	lpushx
   lrange	lrem	lset	ltrim	mget	migrate	move	mset	msetnx	object	object_refcount
-  object_encoding	object_idletime	persist	pexpire	pexpireat	ping	psetex	pttl
+  object_encoding	object_idletime	persist	pexpire	pexpireat	pfadd	pfcount	pfmerge	ping	psetex	pttl
   pubsub	pubsub_channels	pubsub_numsub	pubsub_numpat
   publish	quit	randomkey	rename	renamenx	restore	rpop	rpoplpush
   rpush	rpushx	sadd	save	scan	scard	script	script_exists   script_flush    script_kill
   script_load   sdiff	sdiffstore	select	set
   setbit	setex	setnx	setrange	sinter	sinterstore
   sismember	slaveof	slowlog smembers	smove	sort	spop	srandmember
-  srem	sscan	strlen	sunion	sunionstore	sync	time    ttl	type	unwatch watch
-  zadd	zcard
-  zcount	zincrby	zinterstore	zrange	zrangebyscore	zrank	zrem
+  srem	sscan	strlen	sunion	sunionstore	sync	time    ttl	type
+  zadd	zcard	zcount	zincrby	zinterstore	zlexcount	zrange	zrangebylex
+  zrangebyscore	zrank	zrem	zremrangebylex
   zremrangebyrank   zremrangebyscore	zrevrange	zrevrangebyscore	zrevrank
   zscan	zscore	zunionstore
 );
@@ -822,22 +841,23 @@ I<execute>, waits for the reply from the server, and returns it. E.g.:
     $redis->send_command("get", $key, sub { $val = $_[1] });
 
 The following wrapper methods are defined: append, auth, bgrewriteaof, bgsave,
-bitcount, bitop, bitpos, blpop, brpop, brpoplpush, client, client_kill, client_getname,
-client_setname, config, config_get, config_set, config_resetstat, config_rewrite, dbsize,
-debug_object, debug_segfault, decr, decrby, del, dump, echo, eval, evalsha,
-exists, expire, expireat, flushall, flushdb, get, getbit, getrange, getset,
-hdel, hexists, hget, hgetall, hincrby, hincrbyfloat, hkeys, hlen, hmget, hscan, hmset,
-hset, hsetnx, hvals, incr, incrby, incrbyfloat, keys, lastsave, lindex,
-linsert, llen, lpop, lpush, lpushx, lrange, lrem, lset, ltrim, mget, migrate,
-move, mset, msetnx, object, object_refcount, object_encoding, object_idletime,
-persist, pexpire, pexpireat, ping, psetex, pttl, publish,
-pubsub, pubsub_channels, pubsub_numsub, pubsub_numpat, quit, randomkey,
-rename, renamenx, restore, rpop, rpoplpush, rpush, rpushx, sadd, save, scan, scard,
-script, script_exists, script_flush, script_kill, script_load, sdiff,
-sdiffstore, select, set, setbit, setex, setnx, setrange, sinter, sinterstore,
-sismember, slaveof, slowlog, smembers, smove, sort, spop, srandmember, srem,
-sscan strlen, sunion, sunionstore, sync, time, ttl, type, unwatch, watch, zadd,
-zcard, zcount, zincrby, zinterstore, zrange, zrangebyscore, zrank, zrem,
+bitcount, bitop, bitpos, blpop, brpop, brpoplpush, client, client_kill,
+client_getname, client_setname, config, config_get, config_set,
+config_resetstat, config_rewrite, dbsize, debug_error, debug_object, debug_segfault, decr,
+decrby, del, dump, echo, eval, evalsha, exists, expire, expireat, flushall,
+flushdb, get, getbit, getrange, getset, hdel, hexists, hget, hgetall, hincrby,
+hincrbyfloat, hkeys, hlen, hmget, hscan, hmset, hset, hsetnx, hvals, incr,
+incrby, incrbyfloat, keys, lastsave, lindex, linsert, llen, lpop, lpush,
+lpushx, lrange, lrem, lset, ltrim, mget, migrate, move, mset, msetnx, object,
+object_refcount, object_encoding, object_idletime, persist, pexpire, pexpireat,
+pfadd, pfcount, pfmerge, ping, psetex, pttl, publish, pubsub, pubsub_channels, pubsub_numsub,
+pubsub_numpat, quit, randomkey, rename, renamenx, restore, rpop, rpoplpush,
+rpush, rpushx, sadd, save, scan, scard, script, script_exists, script_flush,
+script_kill, script_load, sdiff, sdiffstore, select, set, setbit, setex, setnx,
+setrange, sinter, sinterstore, sismember, slaveof, slowlog, smembers, smove,
+sort, spop, srandmember, srem, sscan strlen, sunion, sunionstore, sync, time,
+ttl, type, unwatch, watch, zadd, zcard, zcount, zincrby, zinterstore,
+zlexcount, zrange, zrangebylex, zrangebyscore, zrank, zrem, zremrangebylex,
 zremrangebyrank, zremrangebyscore, zrevrange, zrevrangebyscore, zrevrank,
 zscan, zscore, zunionstore.
 
@@ -866,7 +886,7 @@ The following commands implement some additional postprocessing of the results:
 
 =cut
 
-=head2 $self->info([$callback])
+=head2 $self->info([\&callback])
 
 return information and statistics about the server. Redis-server returns
 information in form of I<field:value>, the I<info> method parses result and
@@ -897,7 +917,7 @@ sub _parse_info {
     return \%info;
 }
 
-=head2 $self->client_list([$callback])
+=head2 $self->client_list([\&callback])
 
 return list of clients connected to the server. This method parses server
 output and returns result as reference to array of hashes.
@@ -1065,7 +1085,11 @@ L<RedisDB::Error::DISCONNECTED> class, next time you use the object it will
 establish a new connection. If L</raise_error> disabled, the module will pass
 L<RedisDB::Error::DISCONNECTED> object to all outstanding callbacks and will
 try to reconnect to the server; it will also automatically restore
-subscriptions if object was in subscription mode.
+subscriptions if object was in subscription mode. Module never tries to
+reconnect after MULTI or WATCH command was sent to server and before
+corresponding UNWATCH, EXEC or DISCARD was sent as this may cause data
+corruption, so during transaction module behaves like if L</raise_error> is
+set.
 
 Module makes several attempts to reconnect each time increasing interval before
 the next attempt, depending on the values of L</reconnect_attempts> and
@@ -1075,7 +1099,7 @@ hostname, so on next attempt module will try to connect to different server.
 
 =cut
 
-=head1 PIPELINING SUPPORT
+=head1 PIPELINING
 
 You can send commands in the pipelining mode. It means you are sending multiple
 commands to the server without waiting for the replies.  This is implemented by
@@ -1136,47 +1160,74 @@ or using L</"WRAPPER METHODS"> you can rewrite it as:
 
 =cut
 
-=head1 SUBSCRIPTIONS SUPPORT
+=head1 PUB/SUB MESSAGING
 
 RedisDB supports subscriptions to redis channels. In the subscription mode you
 can subscribe to some channels and receive all the messages sent to these
-channels.  Every time RedisDB receives a message for the channel it invokes a
-callback provided by the user. User can specify different callbacks for the
-different channels.  When in the subscription mode you can subscribe to
-additional channels, or unsubscribe from the channels you subscribed to, but
-you can't use any other redis commands like set, get, etc. Here is the example
-of running in the subscription mode:
+channels. You can subscribe to channels and then manually check messages using
+I<get_reply> method, or you can invoke I<subscription_loop> method, which will
+block in loop waiting for messages and invoking callback for each received
+message.  In the first case you can use I<subscribe> and I<psubscribe> methods
+to subscribe to channels and then you can use I<get_reply> method to get
+messages from the channel:
+
+    $redis->subscribe(
+        foo => sub {
+            my ( $redis, $channel, $patern, $message ) = @_;
+            print "Foo: $message\n";
+        }
+    );
+    # Wait for messages
+    $res = $redis->get_reply;
+
+I<get_reply> method for messages from the channel will invoke callback
+specified as the second optional argument of the I<subscribe> method and will
+also return raw replies from the server, both for messages from the channels
+and for informational messages from the redis server. If you do not want to
+block in I<get_reply> method, you can check if there are any messages using
+I<reply_ready> method.
+
+In the second case you invoke I<subscription_loop> method, it subscribes to
+specified channels and waits for messages, when a message arrived it invokes
+callback defined for the channel from which the message came. Here is an
+example:
 
     my $message_cb = sub {
-        my ($redis, $channel, $pattern, $message) = @_;
+        my ( $redis, $channel, $pattern, $message ) = @_;
         print "$channel: $message\n";
     };
-    
+
     my $control_cb = sub {
-        my ($redis, $channel, $pattern, $message) = @_;
-        if ($channel eq 'control.quit') {
+        my ( $redis, $channel, $pattern, $message ) = @_;
+        if ( $channel eq 'control.quit' ) {
             $redis->unsubscribe;
             $redis->punsubscribe;
         }
-        elsif ($channel eq 'control.subscribe') {
+        elsif ( $channel eq 'control.subscribe' ) {
             $redis->subscribe($message);
         }
     };
-    
-    subscription_loop(
-        subscribe => [ 'news',  ],
-        psubscribe => [ 'control.*' => $control_cb ],
+
+    $redis->subscription_loop(
+        subscribe        => [ 'news', ],
+        psubscribe       => [ 'control.*' => $control_cb ],
         default_callback => $message_cb,
     );
 
-subscription_loop will subscribe you to the news channel and control.*
+subscription_loop will subscribe you to the "news" channel and "control.*"
 channels. It will call specified callbacks every time a new message received.
-You can subscribe to additional channels sending their names to the
-control.subscribe channel. You can unsubscribe from all the channels by sending
-a message to the control.quit channel. Every callback receives four arguments:
-the RedisDB object, the channel for which the message was received, the pattern
-if you subscribed to this channel using I<psubscribe> method, and the message
-itself.
+When message came from "control.subscribe" channel, callback subscribes to an
+additional channel. When message came from "control.quit" channel, callback
+unsubscribes from all channels.
+
+Callbacks used in subscription mode receive four arguments: the RedisDB object,
+the channel from which the message came, the pattern if you subscribed to this
+channel using I<psubscribe> method, and the message itself.
+
+Once you switched into subscription mode using either I<subscribe> or
+I<psubscribe> command, or by entering I<subscription_loop>, you only can send
+I<subscribe>, I<psubscribe>, I<unsubscribe>, and I<punsubscribe> commands to
+the server, other commands will throw an exception.
 
 You can publish messages into the channels using the I<publish> method. This
 method should be called when you in the normal mode, and can't be used while
@@ -1225,11 +1276,11 @@ for every channel you are going to subscribe.
 
 sub subscription_loop {
     my ( $self, %args ) = @_;
-    croak "Already in subscription loop" if $self->{_subscription_loop};
+    croak "Already in subscription loop" if $self->{_subscription_loop} > 0;
     croak "You can't start subscription loop while in pipelining mode."
       if $self->replies_to_fetch;
-    $self->{_subscribed}        = {};
-    $self->{_psubscribed}       = {};
+    $self->{_subscribed}  ||= {};
+    $self->{_psubscribed} ||= {};
     $self->{_subscription_cb}   = $args{default_callback};
     $self->{_subscription_loop} = 1;
     $self->{_parser}->set_default_callback( \&_queue );
@@ -1252,32 +1303,16 @@ sub subscription_loop {
       unless ( keys %{ $self->{_subscribed} } or keys %{ $self->{_psubscribed} } );
 
     while ( $self->{_subscription_loop} ) {
-        my $msg = $self->get_reply;
-        confess "Expected multi-bulk reply, but got $msg" unless ref $msg;
-        if ( $msg->[0] eq 'message' ) {
-            $self->{_subscribed}{ $msg->[1] }( $self, $msg->[1], undef, $msg->[2] )
-              if $self->{_subscribed}{ $msg->[1] };
-        }
-        elsif ( $msg->[0] eq 'pmessage' ) {
-            $self->{_psubscribed}{ $msg->[1] }( $self, $msg->[2], $msg->[1], $msg->[3] )
-              if $self->{_psubscribed}{ $msg->[1] };
-        }
-        elsif ( $msg->[0] =~ /^p?(un)?subscribe/ ) {
-
-            # ignore
-        }
-        else {
-            confess "Got unknown reply $msg->[0] in subscription mode";
-        }
+        $self->get_reply;
     }
     return;
 }
 
-=head2 $self->subscribe($channel[, $callback])
+=head2 $self->subscribe($channel[, \&callback])
 
 Subscribe to the I<$channel>. If I<$callback> is not specified, default
-callback will be used. If you are invoking I<subscribe> outside of subscription
-loop, I<$callback> is ignored.
+callback will be used in subscription loop, or messages will be returned by
+I<get_reply> if you are not using subscription loop.
 
 =cut
 
@@ -1294,18 +1329,18 @@ sub subscribe {
           or croak "Callback for $channel not specified, neither default callback defined";
     }
     else {
-        $callback = 1;
+        $callback ||= sub { 1 };
     }
     $self->{_subscribed}{$channel} = $callback;
-    $self->send_command( "SUBSCRIBE", $channel );
+    $self->send_command( "SUBSCRIBE", $channel, \&_queue );
     return;
 }
 
-=head2 $self->psubscribe($pattern[, $callback])
+=head2 $self->psubscribe($pattern[, \&callback])
 
 Subscribe to channels matching I<$pattern>. If I<$callback> is not specified,
-default callback will be used. If you are invoking I<psubscribe> outside of
-subscription loop, I<$callback> is ignored.
+default callback will be used in subscription loop, or messages will be
+returned by I<get_reply> if you are not using subscription loop.
 
 =cut
 
@@ -1322,10 +1357,10 @@ sub psubscribe {
           or croak "Callback for $channel not specified, neither default callback defined";
     }
     else {
-        $callback = 1;
+        $callback ||= sub { 1 };
     }
     $self->{_psubscribed}{$channel} = $callback;
-    $self->send_command( "PSUBSCRIBE", $channel );
+    $self->send_command( "PSUBSCRIBE", $channel, \&_queue );
     return;
 }
 
@@ -1405,17 +1440,17 @@ sub psubscribed {
     return keys %{ shift->{_psubscribed} };
 }
 
-=head1 TRANSACTIONS SUPPORT
+=head1 TRANSACTIONS
 
 Transactions allow you to execute a sequence of commands in a single step. In
 order to start a transaction you should use the I<multi> method.  After you
 have entered a transaction all the commands you issue are queued, but not
 executed till you call the I<exec> method. Typically these commands return
 string "QUEUED" as a result, but if there is an error in e.g. number of
-arguments, they may croak. When you call exec, all the queued commands will be
-executed and exec will return a list of results for every command in the
-transaction. If instead of I<exec> you call I<discard>, all scheduled commands
-will be canceled.
+arguments, they may return an error. When you call exec, all the queued
+commands will be executed and exec will return a list of results for every
+command in the transaction. If instead of I<exec> you call I<discard>, all
+scheduled commands will be canceled.
 
 You can set some keys as watched. If any watched key has been changed by
 another client before you called exec, the transaction will be discarded and
@@ -1423,7 +1458,45 @@ exec will return false value.
 
 =cut
 
-=head2 $self->multi
+=head2 $self->watch(@keys[, \&callback])
+
+mark given keys to be watched
+
+=cut
+
+sub watch {
+    my $self = shift;
+
+    $self->{_watching} = 1;
+    if ( ref $_[-1] eq 'CODE' ) {
+        return $self->send_command( 'WATCH', @_ );
+    }
+    else {
+        return $self->execute( 'WATCH', @_ );
+    }
+}
+
+=head2 $self->unwatch([\&callback])
+
+unwatch all keys
+
+=cut
+
+sub unwatch {
+    my $self = shift;
+
+    my $res;
+    if ( ref $_[-1] eq 'CODE' ) {
+        $res = $self->send_command( 'UNWATCH', @_ );
+    }
+    else {
+        $res = $self->execute( 'UNWATCH', @_ );
+    }
+    $self->{_watching} = undef;
+    return $res;
+}
+
+=head2 $self->multi([\&callback])
 
 Enter the transaction. After this and till I<exec> or I<discard> will be called,
 all the commands will be queued but not executed.
@@ -1434,12 +1507,16 @@ sub multi {
     my $self = shift;
 
     die "Multi calls can not be nested!" if $self->{_in_multi};
-    my $res = $self->execute('MULTI');
     $self->{_in_multi} = 1;
-    return $res;
+    if ( ref $_[-1] eq 'CODE' ) {
+        return $self->send_command( 'MULTI', @_ );
+    }
+    else {
+        return $self->execute('MULTI');
+    }
 }
 
-=head2 $self->exec
+=head2 $self->exec([\&callback])
 
 Execute all queued commands and finish the transaction. Returns a list of
 results for every command. Will croak if some command has failed.  Also
@@ -1451,12 +1528,19 @@ client, the transaction will be canceled and I<exec> will return false.
 sub exec {
     my $self = shift;
 
-    my $res = $self->execute('EXEC');
+    my $res;
+    if ( ref $_[-1] eq 'CODE' ) {
+        $res = $self->send_command( 'EXEC', @_ );
+    }
+    else {
+        $res = $self->execute('EXEC');
+    }
     $self->{_in_multi} = undef;
+    $self->{_watching} = undef;
     return $res;
 }
 
-=head2 $self->discard
+=head2 $self->discard([\&callback])
 
 Discard all queued commands without executing them and unwatch all keys.
 
@@ -1465,8 +1549,15 @@ Discard all queued commands without executing them and unwatch all keys.
 sub discard {
     my $self = shift;
 
-    my $res = $self->execute('DISCARD');
+    my $res;
+    if ( ref $_[-1] eq 'CODE' ) {
+        $res = $self->send_command( 'DISCARD', @_ );
+    }
+    else {
+        $res = $self->execute('DISCARD');
+    }
     $self->{_in_multi} = undef;
+    $self->{_watching} = undef;
     return $res;
 }
 
@@ -18,7 +18,7 @@ if ( $redis->version >= 2 ) {
     is $redis->ping, 'PONG', "Still can ping server after reconnecting";
     is $redis->get("Database"), 1, "Selected database 1";
     $redis->{password} = 'wrong';
-    $redis->quit;
+    delete $redis->{_socket};
     throws_ok { $redis->ping } qr/invalid password/i, "dies on reconnect if password is wrong";
 }
 else {
@@ -1,6 +1,6 @@
 use Test::Most 0.22;
 use RedisDB;
-use IO::Socket::INET;
+use IO::Socket::IP;
 use IO::Socket::UNIX;
 use File::Temp qw(tempdir);
 use File::Spec;
@@ -10,14 +10,14 @@ use Test::FailWarnings;
 
 # Check that module is able to restore connection
 subtest "Restore connection" => sub {
-    my $srv = IO::Socket::INET->new(
+    my $srv = IO::Socket::IP->new(
         LocalAddr => '127.0.0.1',
         Proto     => 'tcp',
         Listen    => 1,
         ReuseAddr => 1,
     );
     plan skip_all => "Can't start server" unless $srv;
-    my $empty_port = IO::Socket::INET->new(
+    my $empty_port = IO::Socket::IP->new(
         LocalAddr => '127.0.0.1',
         Proto     => 'tcp',
         Listen    => 1,
@@ -37,7 +37,7 @@ subtest "Restore connection" => sub {
 
         # simulate restart of the redis-server
         usleep 100_000;
-        $srv = IO::Socket::INET->new(
+        $srv = IO::Socket::IP->new(
             LocalAddr => '127.0.0.1',
             LocalPort => $port,
             Proto     => 'tcp',
@@ -94,7 +94,7 @@ subtest "Restore connection" => sub {
 
 # Check functionality if raise_error is disabled
 subtest "Restore connection without raise_error" => sub {
-    my $srv = IO::Socket::INET->new(
+    my $srv = IO::Socket::IP->new(
         LocalAddr => '127.0.0.1',
         Proto     => 'tcp',
         Listen    => 1,
@@ -117,7 +117,7 @@ subtest "Restore connection without raise_error" => sub {
         $cli->close;
 
         usleep 100_000;
-        $srv = IO::Socket::INET->new(
+        $srv = IO::Socket::IP->new(
             LocalAddr => '127.0.0.1',
             LocalPort => $port,
             Proto     => 'tcp',
@@ -153,15 +153,17 @@ subtest "Restore connection without raise_error" => sub {
 
     # now server disconnects again, so send will start failing also
     $redis->{reconect_attempts} = 1;
-    $res = $redis->set("key", "value");
-    isa_ok $res, "RedisDB::Error::DISCONNECTED", "got an error when server closed connection without sending reply";
-    $res = $redis->set("key", "value");
-    isa_ok $res, "RedisDB::Error::DISCONNECTED", "got an error when module couldn't establish connection with the server";
+    $res = $redis->set( "key", "value" );
+    isa_ok $res, "RedisDB::Error::DISCONNECTED",
+      "got an error when server closed connection without sending reply";
+    $res = $redis->set( "key", "value" );
+    isa_ok $res, "RedisDB::Error::DISCONNECTED",
+      "got an error when module couldn't establish connection with the server";
 };
 
 # Check what will happen if server immediately closes connection
 subtest "No _connect recursion" => sub {
-    my $srv = IO::Socket::INET->new( LocalAddr => '127.0.0.1', Proto => 'tcp', Listen => 1 );
+    my $srv = IO::Socket::IP->new( LocalAddr => '127.0.0.1', Proto => 'tcp', Listen => 1 );
     plan skip_all => "Can't start server" unless $srv;
 
     my $pid = fork;
@@ -183,7 +185,7 @@ subtest "No _connect recursion" => sub {
 subtest "socket timeout" => sub {
     plan skip_all => "OS $^O doesn't support timeout on sockets" if $^O =~ /solaris|MSWin32|cygwin/;
 
-    my $srv = IO::Socket::INET->new( LocalAddr => '127.0.0.1', Proto => 'tcp', Listen => 1 );
+    my $srv = IO::Socket::IP->new( LocalAddr => '127.0.0.1', Proto => 'tcp', Listen => 1 );
 
     my $pid = fork;
     if ( $pid == 0 ) {
@@ -235,4 +237,27 @@ subtest "UNIX socket" => sub {
     is $redis->get("ping"), "PONG", "Got PONG via UNIX socket";
 };
 
+subtest "IPv6" => sub {
+    my $srv = try {
+        IO::Socket::IP->new(
+            V6Only    => 1,
+            LocalHost => '::1',
+            Listen    => 1,
+        );
+    };
+    plan skip_all => "Can't create IPv6 socket" unless $srv;
+    my $pid = fork;
+    if ( $pid == 0 ) {
+        $SIG{ALRM} = sub { exit 0 };
+        alarm 10;
+        my $cli = $srv->accept;
+        defined $cli->recv( my $buf, 1024 ) or die "recv filed: $!";
+        defined $cli->send("+PONG\r\n") or die "send filed: $!";
+        $cli->close;
+        exit 0;
+    }
+    my $redis = RedisDB->new( host => '::1', port => $srv->sockport );
+    is $redis->get("ping"), "PONG", "Got PONG via IPv6 socket";
+};
+
 done_testing;
@@ -10,7 +10,7 @@ BEGIN {
 
 use RedisDB;
 
-my $srv = IO::Socket::INET->new( LocalAddr => '127.0.0.1', Proto => 'tcp', Listen => 1 );
+my $srv = IO::Socket::IP->new( LocalAddr => '127.0.0.1', Proto => 'tcp', Listen => 1 );
 plan skip_all => "Can't start server" unless $srv;
 my $pid = fork;
 
@@ -19,13 +19,14 @@ subtest "Hashes commands"           => \&cmd_hashes;
 subtest "Server info commands"      => \&cmd_server;
 subtest "Sets commands"             => \&cmd_sets;
 subtest "Ordered sets commands"     => \&cmd_zsets;
+subtest "HyperLogLog commands"      => \&cmd_hyperloglog;
 subtest "Scripts"                   => \&cmd_scripts;
 
 sub group_pairs {
     my $ref = shift;
     my @res;
     while (@$ref) {
-        push @res, [ shift @$ref, shift @$ref];
+        push @res, [ shift @$ref, shift @$ref ];
     }
     return @res;
 }
@@ -81,8 +82,9 @@ sub cmd_keys_strings {
             is $redis->get("bits5"), "\x70\x50", "bits5 == bits3 & bits4";
             is $redis->bitop( "XOR", "bits6", "bits3", "bits4" ), 2, "BITOP XOR";
             is $redis->get("bits6"), "\x85\xa5", "bits6 == bits3 ^ bits4";
+
             if ( $redis->version >= 2.008007 ) {
-                $redis->set("bits7", "\x01\xfe\x01");
+                $redis->set( "bits7", "\x01\xfe\x01" );
                 is $redis->bitpos( "bits7", 0, 1 ), 15, "BITPOS";
             }
             else {
@@ -298,14 +300,10 @@ sub cmd_server {
     is ref($info2), "HASH", "Got hashref in info callback";
     is $info2->{redis_version}, $info->{redis_version}, "Same info as from synchronous call";
 
-    if ( $redis->version >= 2.0 ) {
+    if ( $redis->version ge 2.006009 ) {
         eq_or_diff $redis->config_get("dbfilename"), [qw(dbfilename dump_test.rdb)], "CONFIG GET";
-    }
-    if ( $redis->version >= 2.005 ) {
         my ( $sec, $ms ) = @{ $redis->time };
         ok time - $sec < 2, "Server time is correct";
-    }
-    if ( $redis->version ge 2.006009 ) {
         my $redis2 =
           RedisDB->new( host => 'localhost', port => $server->{port}, connection_name => 'bar', );
         is $redis->client_getname, undef, "Name for connection is not set";
@@ -313,6 +311,7 @@ sub cmd_server {
         is $redis->client_getname, "foo", "Now connection name is 'foo'";
         my $clients = $redis->client_list;
         is 0 + @$clients, 2, "Two clients connected to the server";
+
         unless ( $clients->[0]{name} eq 'foo' ) {
             @$clients = reverse @$clients;
         }
@@ -328,6 +327,15 @@ sub cmd_server {
     else {
         diag "Skipped tests for redis >= 2.6.9";
     }
+    if ( $redis->version ge 2.008008 ) {
+        throws_ok {
+            $redis->debug_error("Boo!");
+        }
+        qr/Boo!/, "DEBUG ERROR";
+    }
+    else {
+        diag "Skipped tests for redis >= 2.8.8";
+    }
 }
 
 sub cmd_sets {
@@ -422,6 +430,15 @@ sub cmd_zsets {
         eq_or_diff cut_precision( $redis->zrange( "zunion", 0, -1, "WITHSCORES" ) ),
           [qw(A 1.5 B 2.4 C 3.3 D 4.2 E 5.1 F 6)], "ZUNIONSTORE result is correct";
     }
+
+    if ( $redis->version >= 2.008009 ) {
+        is $redis->zadd( 'zlex', qw(0 a 0 b 0 c 0 d 0 e 0 f 0 g) ), 7, "added 7 elements to zlex";
+        eq_or_diff $redis->zrangebylex( 'zlex', '-', '[c' ), [qw( a b c )], "ZRAGEBYLEX";
+        is $redis->zlexcount( 'zlex', '(b', '[e' ), 3, "ZLEXCOUNT";
+        is $redis->zremrangebylex( 'zlex', '(b', '[e' ), 3, "ZREMRANGEBYLEX";
+        eq_or_diff $redis->zrange( 'zlex', 0, -1 ), [qw(a b f g)],
+          "correct set after ZREMRANGEBYLEX";
+    }
 }
 
 sub cmd_scripts {
@@ -444,4 +461,13 @@ sub cmd_scripts {
     eq_or_diff $redis->evalsha( $sha3, 1, 'eval' ), "passed", "EVALSHA";
 }
 
+sub cmd_hyperloglog {
+    plan skip_all => "This test requires redis-server >= 2.8.9" unless $redis->version >= 2.008009;
+    $redis->flushdb;
+    is $redis->pfadd( 'hll1', qw(a b c d) ), 1, "PFADD";
+    is $redis->pfcount('hll1'), 4, "PFCOUNT";
+    is $redis->pfadd( 'hll2', qw(a b e f) ), 1, "PFADD";
+    is $redis->pfmerge( 'hll3', 'hll1', 'hll2' ), 'OK', "PFMERGE";
+}
+
 done_testing;
@@ -1,8 +1,8 @@
 use Test::Most 0.22;
-use Test::FailWarnings;
 use lib 't';
 use RedisServer;
 use RedisDB;
+use Test::FailWarnings;
 
 my $server = RedisServer->start;
 plan( skip_all => "Can't start redis-server" ) unless $server;
@@ -114,48 +114,57 @@ sub def_cb {
     return;
 }
 
-my $pub = RedisDB->new( host => 'localhost', port => $server->{port} );
-my $sub = RedisDB->new( host => 'localhost', port => $server->{port} );
-$sub->subscribe('baz', sub { $_[0]->unsubscribe('baz') });
-$sub->psubscribe('un*', sub { $_[0]->punsubscribe });
-my $rep =$sub->get_reply;
-is $rep->[0], 'subscribe', "got subscribe reply";
-$rep =$sub->get_reply;
-is $rep->[0], 'psubscribe', "got psubscribe reply";
-dies_ok { $sub->get('key') } "get is not allowed in subscription mode";
-
-if ( $pub->version >= 2.008 ) {
-    subtest "PUBSUB" => sub {
-        eq_or_diff $pub->pubsub('CHANNELS'), ['baz'], "Only baz channel is active";
-        eq_or_diff $pub->pubsub_channels,    ['baz'], "Only baz channel is active";
-        eq_or_diff $pub->pubsub_numsub( 'baz', 'boo' ), [ 'baz', 1, 'boo', 0 ],
-          "baz has one subscriber, boo has none";
-        eq_or_diff $pub->pubsub_numpat, 1, "one pattern subscriber";
-    };
-}
-else {
-    diag "Not testing PUBSUB command. Requires redis-server >= 2.8.0";
-}
-
-$pub->publish('unexpected', 'msg 1');
-$pub->publish('baz', 'msg 2');
+subtest "subscriptions outside of subscription_loop" => sub {
+    my $pub = RedisDB->new( host => 'localhost', port => $server->{port} );
+    my $sub = RedisDB->new( host => 'localhost', port => $server->{port} );
+    my $received;
+    my $cb = sub { $received->{ $_[1] } = $_[3] };
+    $sub->subscribe( 'baz', $cb );
+    $sub->psubscribe( 'un*', $cb );
+    my $rep = $sub->get_reply;
+    is $rep->[0], 'subscribe', "got subscribe reply";
+    $rep = $sub->get_reply;
+    is $rep->[0], 'psubscribe', "got psubscribe reply";
+    ok !$received, "p?subscribe messages didn't invoke callback";
+    dies_ok { $sub->get('key') } "get is not allowed in subscription mode";
+
+    if ( $pub->version >= 2.008 ) {
+        subtest "PUBSUB" => sub {
+            eq_or_diff $pub->pubsub('CHANNELS'), ['baz'], "Only baz channel is active";
+            eq_or_diff $pub->pubsub_channels,    ['baz'], "Only baz channel is active";
+            eq_or_diff $pub->pubsub_numsub( 'baz', 'boo' ), [ 'baz', 1, 'boo', 0 ],
+              "baz has one subscriber, boo has none";
+            eq_or_diff $pub->pubsub_numpat, 1, "one pattern subscriber";
+        };
+    }
+    else {
+        diag "Not testing PUBSUB command. Requires redis-server >= 2.8.0";
+    }
 
-$rep = $sub->get_reply;
-eq_or_diff $rep, ['pmessage', 'un*', 'unexpected', 'msg 1'], "got msg 1 from the unexpected channel";
-$rep = $sub->get_reply;
-eq_or_diff $rep, ['message', 'baz', 'msg 2'], "got msg 2 from the baz channel";
+    $pub->publish( 'unexpected', 'msg 1' );
+    $pub->publish( 'baz',        'msg 2' );
 
-{
-    # this also checks how recv in get_reply deals with interrupts
-    local $SIG{ALRM} = sub { $pub->publish( 'baz', 'msg 3' ); delete $SIG{ALRM}; alarm 3; };
-    alarm 1;
     $rep = $sub->get_reply;
-    alarm 0;
-    eq_or_diff $rep, [ 'message', 'baz', 'msg 3' ], "got msg 3 from the baz channel";
-}
+    eq_or_diff $rep, [ 'pmessage', 'un*', 'unexpected', 'msg 1' ],
+      "got msg 1 from the unexpected channel";
+    is $received->{unexpected}, 'msg 1', "callback for unexpected channel was invoked";
+    $rep = $sub->get_reply;
+    eq_or_diff $rep, [ 'message', 'baz', 'msg 2' ], "got msg 2 from the baz channel";
+    is $received->{baz}, 'msg 2', "callback for baz channel was invoked";
+
+    {
+        # this also checks how recv in get_reply deals with interrupts
+        local $SIG{ALRM} = sub { $pub->publish( 'baz', 'msg 3' ); delete $SIG{ALRM}; alarm 3; };
+        alarm 1;
+        $rep = $sub->get_reply;
+        alarm 0;
+        eq_or_diff $rep, [ 'message', 'baz', 'msg 3' ], "got msg 3 from the baz channel";
+        is $received->{baz}, 'msg 3', "callback for baz channel was invoked";
+    }
 
-$sub->unsubscribe;
-$sub->punsubscribe;
+    $sub->unsubscribe;
+    $sub->punsubscribe;
+};
 
 subtest "unsubscribe without psubscriptions (issue #18)" => sub {
     my $sub = RedisDB->new( host => 'localhost', port => $server->{port} );
@@ -168,4 +177,24 @@ subtest "unsubscribe without psubscriptions (issue #18)" => sub {
     pass "Punsubscribed";
 };
 
+subtest "subscribe before starting subscription loop" => sub {
+    unless ( my $pid = fork ) {
+        die "Couldn't fork: $!" unless defined $pid;
+        sleep 1;
+        $redis->publish( bar          => 'bar message' );
+        $redis->publish( 'other.quit' => 'quit' );
+        exit 0;
+    }
+    my $sub = RedisDB->new( host => 'localhost', port => $server->{port} );
+    $sub->subscribe( 'bar' => \&def_cb );
+    $counts{bar}   = 0;
+    $counts{other} = 0;
+    $sub->subscription_loop(
+        psubscribe       => ['other.*'],
+        default_callback => \&def_cb,
+    );
+    is $counts{bar},   1, "got one message for bar";
+    is $counts{other}, 1, "got one message for other";
+};
+
 done_testing;
@@ -2,6 +2,7 @@ use Test::Most 0.22;
 use lib 't';
 use RedisServer;
 use RedisDB;
+use IO::Select;
 use Time::HiRes qw(usleep);
 
 my $server = RedisServer->start;
@@ -42,11 +43,26 @@ subtest "With raise error" => sub {
         is $redis->get("key"), "value", "key wasn't changed";
     }
 
-    # must not reconnect while in multi
+    note "must not reconnect while in multi";
     is $redis->multi, "OK", "Entered transaction";
     is $redis->set( "key", "42" ), "QUEUED", "QUEUED set";
     is $redis->quit, "OK", "QUIT";
     dies_ok { $redis->set( "key2", "43" ) } "Not reconnecting when in transaction";
+
+    ok $redis->multi(RedisDB::IGNORE_REPLY), "Entered transaction (async)";
+    is $redis->set( "key", "42" ), "QUEUED", "QUEUED set";
+    is $redis->quit, "OK", "QUIT";
+    dies_ok { $redis->set( "key2", "43" ) } "Not reconnecting when in transaction";
+
+    note "must not reconnect when watching key";
+    is $redis->watch("key"), "OK", "Watching key";
+    is $redis->quit, "OK", "QUIT";
+    dies_ok { $redis->set( "key2", "43" ) } "Not reconnecting when watching";
+
+    ok $redis->watch( "key", RedisDB::IGNORE_REPLY ), "Watching key (async)";
+    is $redis->quit, "OK", "QUIT";
+    dies_ok { $redis->set( "key2", "43" ) } "Not reconnecting when watching";
+
     $redis = undef;
 };
 
@@ -62,6 +78,14 @@ subtest "multi/exec without raise_error" => sub {
         connection_name => 'test_connection_3',
     );
 
+    my $kill_connection = sub {
+        my ($r3) =
+          map { $_->{addr} } grep { $_->{name} eq 'test_connection_3' } @{ $redis2->client_list };
+        ok $r3, "Got address for test_connection_3";
+        $redis2->client_kill($r3);
+        IO::Select->new( $redis->{_socket} )->can_read;
+    };
+
     note "inside transaction raise_error is always on";
     is $redis3->hset( "hash", "key", "value" ), 1, "Set hash key";
     my $res = $redis3->get("hash");
@@ -73,14 +97,23 @@ subtest "multi/exec without raise_error" => sub {
     note "redis will not reconnect in the middle of transaction";
     $redis3->reset_connection;
     is $redis3->multi, "OK", "Entered transaction";
-    my ($r3) =
-      map { $_->{addr} } grep { $_->{name} eq 'test_connection_3' } @{ $redis2->client_list };
-    ok $r3, "Got address for test_connection_3";
-    $redis3->set( "test3", "test3", RedisDB::IGNORE_REPLY );
-    $redis2->client_kill($r3);
-    usleep 100_000;
+    $redis3->set( "test3", "test3" );
+    $kill_connection->();
     throws_ok { $redis3->set( "test3", "42" ) } "RedisDB::Error::DISCONNECTED",
-      "on disconnect throws exception";
+      "not reconnecting when in multi";
+
+    note "must not reconnect when watching key";
+    $redis3->reset_connection;
+    is $redis3->watch("key"), "OK", "Watching key";
+    $kill_connection->();
+    throws_ok { $redis3->set( "test3", "42" ) } "RedisDB::Error::DISCONNECTED",
+      "not reconnecting when watching";
+
+    $redis3->reset_connection;
+    is $redis3->watch("key"), "OK", "Watching key";
+    is $redis3->unwatch, "OK", "Unwatched";
+    $kill_connection->();
+    is $redis3->ping, "PONG", "After unwatch it reconnects";
 
     note "exec should restore raise_error";
     $redis3->reset_connection;
@@ -96,4 +129,42 @@ subtest "multi/exec without raise_error" => sub {
     isa_ok $res, "RedisDB::Error", "raise_error unset after transaction finished";
 };
 
+subtest "Reconnecting if disconnected after EXEC" => sub {
+    my $server = Test::TCP->new(
+        code => sub {
+            my $port = shift;
+            my $sock = IO::Socket::IP->new(
+                LocalPort => $port,
+                Listen    => 1,
+            );
+            while ( my $cli = $sock->accept ) {
+                my $line;
+                while ( defined( $line = <$cli> ) ) {
+                    if ( $line =~ /MULTI/ ) {
+                        $cli->send( "+OK\r\n", 0 );
+                    }
+                    elsif ( $line =~ /SET/ ) {
+                        $cli->send( "+QUEUED\r\n", 0 );
+                    }
+                    elsif ( $line =~ /GET/ ) {
+                        $cli->send( ":42\r\n", 0 );
+                    }
+                    elsif ( $line =~ /EXEC/ ) {
+                        $cli->close;
+                        last;
+                    }
+                }
+            }
+        }
+    );
+    my $redis = RedisDB->new( port => $server->port, raise_error => undef, );
+    ok $redis->multi(RedisDB::IGNORE_REPLY), "Entered transaction (async)";
+    is $redis->set( "key", "42" ), "QUEUED", "QUEUED set";
+    my $repl;
+    ok $redis->exec( sub { $repl = $_[1] } ), "Sent EXEC (async)";
+    IO::Select->new( $redis->{_socket} )->can_read;
+    is $redis->get("key"), 42, "Reconnected after EXEC";
+    isa_ok $repl, "RedisDB::Error::DISCONNECTED", "EXEC callback received error";
+};
+
 done_testing;
@@ -26,6 +26,7 @@ SETNAME
 mainloop
 unwatch
 unsubscribe
+punsubscribe
 Unsubscribe
 auth
 callback