The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Redis::Fast;

BEGIN {
    use XSLoader;
    our $VERSION = '0.05';
    XSLoader::load __PACKAGE__, $VERSION;
}

use warnings;
use strict;

use Data::Dumper;
use Carp qw/confess/;
use Encode;
use Try::Tiny;
use Scalar::Util qw(weaken);

sub _new_on_connect_cb {
    my ($self, $on_conn, $password, $name) = @_;
    weaken $self;
    return sub {
        try {
            $self->auth($password) if defined $password;
        } catch {
            confess("Redis server refused password");
        };
        try {
            my $n = $name;
            $n = $n->($self) if ref($n) eq 'CODE';
            $self->client_setname($n) if defined $n;
        };
        $on_conn->($self) if $on_conn;
    };
}

sub new {
  my $class = shift;
  my %args  = @_;
  my $self  = $class->_new;

  #$self->{debug} = $args{debug} || $ENV{REDIS_DEBUG};

  ## default to lax utf8
  my $encoding = exists $args{encoding} ? $args{encoding} : 'utf8';
  if(!$encoding) {
    $self->__set_utf8(0);
  } elsif($encoding eq 'utf8') {
    $self->__set_utf8(1);
  } else {
    die "encoding $encoding does not support";
  }

  ## Deal with REDIS_SERVER ENV
  if ($ENV{REDIS_SERVER} && !$args{sock} && !$args{server}) {
    if ($ENV{REDIS_SERVER} =~ m!^/!) {
      $args{sock} = $ENV{REDIS_SERVER};
    }
    elsif ($ENV{REDIS_SERVER} =~ m!^unix:(.+)!) {
      $args{sock} = $1;
    }
    elsif ($ENV{REDIS_SERVER} =~ m!^(tcp:)?(.+)!) {
      $args{server} = $2;
    }
  }

  my $on_conn = $args{on_connect};
  my $password = $args{password};
  my $name = $args{name};
  $self->__set_on_connect($self->_new_on_connect_cb($on_conn, $password, $name));
  $self->__set_data({
      subscribers => {},
  });

  if ($args{sock}) {
    $self->__connection_info_unix($args{sock});
  }
  else {
    my ($server, $port) = split /:/, ($args{server} || '127.0.0.1:6379');
    $self->__connection_info($server, $port);
  }

  #$self->{is_subscriber} = 0;
  #$self->{subscribers}   = {};
  $self->__set_reconnect($args{reconnect} || 0);
  $self->__set_every($args{every} || 1000);

  $self->__connect;

  return $self;
}


### Deal with common, general case, Redis commands
our $AUTOLOAD;

sub AUTOLOAD {
  my $command = $AUTOLOAD;
  $command =~ s/.*://;
  my @command = split /_/, uc $command;

  my $method = sub {
      my $self = shift;
      $self->__is_valid_command($command);
      my ($ret, $error) = $self->__std_cmd(@command, @_);
      confess "[$command] $error, " if defined $error;
      return (wantarray && ref $ret eq 'ARRAY') ? @$ret : $ret;
  };

  # Save this method for future calls
  no strict 'refs';
  *$AUTOLOAD = $method;

  goto $method;
}

sub __with_reconnect {
  my ($self, $cb) = @_;

  confess "not implemented";
}


### Commands with extra logic

sub keys {
    my $self = shift;
    $self->__is_valid_command('keys');
    my ($ret, $error) = $self->__keys(@_);
    confess "[keys] $error, " if defined $error;
    return $ret unless ref $ret eq 'ARRAY';
    return @$ret;
}

sub ping {
    my $self = shift;
    $self->__is_valid_command('ping');
    return scalar try {
        my ($ret, $error) = $self->__std_cmd('ping');
        return if defined $error;
        return $ret;
    } catch {
        return ;
    };
}

sub info {
    my $self = shift;
    $self->__is_valid_command('info');
    my ($ret, $error) = $self->__info(@_);
    confess "[keys] $error, " if defined $error;
    return $ret unless ref $ret eq 'ARRAY';
    return @$ret;
}

sub quit {
    my $self = shift;
    $self->__is_valid_command('quit');
    $self->__quit(@_);
}

sub shutdown {
    my $self = shift;
    $self->__is_valid_command('shutdown');
    $self->__shutdown(@_);
}

sub __subscription_cmd {
  my $self    = shift;
  my $pr      = shift;
  my $unsub   = shift;
  my $command = shift;
  my $cb      = pop;
  weaken $self;

  confess("Missing required callback in call to $command(), ")
    unless ref($cb) eq 'CODE';

  $self->wait_all_responses;

  my @subs = @_;
  @subs = $self->__process_unsubscribe_requests($cb, $pr, @subs)
      if $unsub;
  my %cbs = map { ("${pr}message:$_" => $cb) } @subs;
  if(@subs) {
      return $self->__send_subscription_cmd(
          $command,
          @subs,
          sub {
              return $self->__process_subscription_changes($command, \%cbs, @_);
          },
      );
  } else {
      return 0;
  }
}

sub subscribe    { shift->__subscription_cmd('',  0, subscribe    => @_) }
sub psubscribe   { shift->__subscription_cmd('p', 0, psubscribe   => @_) }
sub unsubscribe  { shift->__subscription_cmd('',  1, unsubscribe  => @_) }
sub punsubscribe { shift->__subscription_cmd('p', 1, punsubscribe => @_) }

sub __process_unsubscribe_requests {
  my ($self, $cb, $pr, @unsubs) = @_;
  my $subs = $self->__get_data->{subscribers};

  my @subs_to_unsubscribe;
  for my $sub (@unsubs) {
    my $key = "${pr}message:$sub";
    next unless $subs->{$key} && @{ $subs->{$key} };
    my $cbs = $subs->{$key} = [grep { $_ ne $cb } @{ $subs->{$key} }];
    next if @$cbs;

    delete $subs->{$key};
    push @subs_to_unsubscribe, $sub;
  }
  return @subs_to_unsubscribe;
}

sub __process_subscription_changes {
  my ($self, $cmd, $expected, $m, $error) = @_;
  my $subs = $self->__get_data->{subscribers};

  confess "[$cmd] $error, " if defined $error;

  ## Deal with pending PUBLISH'ed messages
  if ($m->[0] =~ /^p?message$/) {
    $self->__process_pubsub_msg($m);
    return ;
  }

  my ($key, $unsub) = $m->[0] =~ m/^(p)?(un)?subscribe$/;
  $key .= "message:$m->[1]";
  my $cb = delete $expected->{$key};

  push @{ $subs->{$key} }, $cb unless $unsub;
}

sub __process_pubsub_msg {
  my ($self, $m) = @_;
  my $subs = $self->__get_data->{subscribers};

  my $sub   = $m->[1];
  my $cbid  = "$m->[0]:$sub";
  my $data  = pop @$m;
  my $topic = $m->[2] || $sub;

  if (!exists $subs->{$cbid}) {
    warn "Message for topic '$topic' ($cbid) without expected callback, ";
    return 0;
  }

  $_->($data, $topic, $sub) for @{ $subs->{$cbid} };

  return 1;

}

sub __is_valid_command {
  my ($self, $cmd) = @_;

  confess("Cannot use command '$cmd' while in SUBSCRIBE mode, ")
    if $self->is_subscriber;
}

1;    # End of Redis.pm

__END__

=encoding utf-8

=head1 NAME

Redis::Fast - Perl binding for Redis database

=head1 SYNOPSIS

    ## Defaults to $ENV{REDIS_SERVER} or 127.0.0.1:6379
    my $redis = Redis::Fast->new;

    my $redis = Redis::Fast->new(server => 'redis.example.com:8080');

    ## Set the connection name (requires Redis 2.6.9)
    my $redis = Redis::Fast->new(
      server => 'redis.example.com:8080',
      name => 'my_connection_name',
    );
    my $generation = 0;
    my $redis = Redis::Fast->new(
      server => 'redis.example.com:8080',
      name => sub { "cache-$$-".++$generation },
    );

    ## Use UNIX domain socket
    my $redis = Redis::Fast->new(sock => '/path/to/socket');

    ## Enable auto-reconnect
    ## Try to reconnect every 500ms up to 60 seconds until success
    ## Die if you can't after that
    my $redis = Redis::Fast->new(reconnect => 60);

    ## Try each 100ms upto 2 seconds (every is in milisecs)
    my $redis = Redis::Fast->new(reconnect => 2, every => 100);

    ## Disable the automatic utf8 encoding => much more performance
    ## !!!! This will be the default after 2.000, see ENCODING below
    my $redis = Redis::Fast->new(encoding => undef);

    ## Use all the regular Redis commands, they all accept a list of
    ## arguments
    ## See http://redis.io/commands for full list
    $redis->get('key');
    $redis->set('key' => 'value');
    $redis->sort('list', 'DESC');
    $redis->sort(qw{list LIMIT 0 5 ALPHA DESC});

    ## Add a coderef argument to run a command in the background
    $redis->sort(qw{list LIMIT 0 5 ALPHA DESC}, sub {
      my ($reply, $error) = @_;
      die "Oops, got an error: $error\n" if defined $error;
      print "$_\n" for @$reply;
    });
    long_computation();
    $redis->wait_all_responses;
    ## or
    $redis->wait_one_response();

    ## Or run a large batch of commands in a pipeline
    my %hash = _get_large_batch_of_commands();
    $redis->hset('h', $_, $hash{$_}, sub {}) for keys %hash;
    $redis->wait_all_responses;

    ## Publish/Subscribe
    $redis->subscribe(
      'topic_1',
      'topic_2',
      sub {
        my ($message, $topic, $subscribed_topic) = @_

          ## $subscribed_topic can be different from topic if
          ## you use psubscribe() with wildcards
      }
    );
    $redis->psubscribe('nasdaq.*', sub {...});

    ## Blocks and waits for messages, calls subscribe() callbacks
    ##  ... forever
    my $timeout = 10;
    $redis->wait_for_messages($timeout) while 1;

    ##  ... until some condition
    my $keep_going = 1; ## other code will set to false to quit
    $redis->wait_for_messages($timeout) while $keep_going;

    $redis->publish('topic_1', 'message');


=head1 DESCRIPTION

C<Redis::Fast> is a wrapper around Salvatore Sanfilippo's
L<hiredis|https://github.com/antirez/hiredis> C client.
It is compatible with L<Redis.pm|https://github.com/melo/perl-redis>.

This version supports protocol 2.x (multi-bulk) or later of Redis available at
L<https://github.com/antirez/redis/>.


=head1 PERFORMANCE IN SYNCHRONIZE MODE

=head2 Redis.pm

    Benchmark: running 00_ping, 10_set, 11_set_r, 20_get, 21_get_r, 30_incr, 30_incr_r, 40_lpush, 50_lpop, 90_h_get, 90_h_set for at least 5 CPU seconds...
       00_ping:  8 wallclock secs ( 0.69 usr +  4.77 sys =  5.46 CPU) @ 5538.64/s (n=30241)
        10_set:  8 wallclock secs ( 1.07 usr +  4.01 sys =  5.08 CPU) @ 5794.09/s (n=29434)
      11_set_r:  7 wallclock secs ( 0.42 usr +  4.84 sys =  5.26 CPU) @ 5051.33/s (n=26570)
        20_get:  8 wallclock secs ( 0.69 usr +  4.82 sys =  5.51 CPU) @ 5080.40/s (n=27993)
      21_get_r:  7 wallclock secs ( 2.21 usr +  3.09 sys =  5.30 CPU) @ 5389.06/s (n=28562)
       30_incr:  7 wallclock secs ( 0.69 usr +  4.73 sys =  5.42 CPU) @ 5671.77/s (n=30741)
     30_incr_r:  7 wallclock secs ( 0.85 usr +  4.31 sys =  5.16 CPU) @ 5824.42/s (n=30054)
      40_lpush:  8 wallclock secs ( 0.60 usr +  4.77 sys =  5.37 CPU) @ 5832.59/s (n=31321)
       50_lpop:  7 wallclock secs ( 1.24 usr +  4.17 sys =  5.41 CPU) @ 5112.75/s (n=27660)
      90_h_get:  7 wallclock secs ( 0.63 usr +  4.65 sys =  5.28 CPU) @ 5716.29/s (n=30182)
      90_h_set:  7 wallclock secs ( 0.65 usr +  4.74 sys =  5.39 CPU) @ 5593.14/s (n=30147)

=head2 Redis::Fast

Redis::Fast is 50% faster than Redis.pm.

    Benchmark: running 00_ping, 10_set, 11_set_r, 20_get, 21_get_r, 30_incr, 30_incr_r, 40_lpush, 50_lpop, 90_h_get, 90_h_set for at least 5 CPU seconds...
       00_ping:  9 wallclock secs ( 0.18 usr +  4.84 sys =  5.02 CPU) @ 7939.24/s (n=39855)
        10_set: 10 wallclock secs ( 0.31 usr +  5.40 sys =  5.71 CPU) @ 7454.64/s (n=42566)
      11_set_r:  9 wallclock secs ( 0.31 usr +  4.87 sys =  5.18 CPU) @ 7993.05/s (n=41404)
        20_get: 10 wallclock secs ( 0.27 usr +  4.84 sys =  5.11 CPU) @ 8350.68/s (n=42672)
      21_get_r: 10 wallclock secs ( 0.32 usr +  5.17 sys =  5.49 CPU) @ 8238.62/s (n=45230)
       30_incr:  9 wallclock secs ( 0.23 usr +  5.27 sys =  5.50 CPU) @ 8221.82/s (n=45220)
     30_incr_r:  8 wallclock secs ( 0.28 usr +  4.91 sys =  5.19 CPU) @ 8092.29/s (n=41999)
      40_lpush:  9 wallclock secs ( 0.18 usr +  5.06 sys =  5.24 CPU) @ 8312.02/s (n=43555)
       50_lpop:  9 wallclock secs ( 0.20 usr +  4.84 sys =  5.04 CPU) @ 8010.12/s (n=40371)
      90_h_get:  9 wallclock secs ( 0.19 usr +  5.51 sys =  5.70 CPU) @ 7467.72/s (n=42566)
      90_h_set:  8 wallclock secs ( 0.28 usr +  4.83 sys =  5.11 CPU) @ 7724.07/s (n=39470)o


=head1 PERFORMANCE IN PIPELINE MODE

    #!/usr/bin/perl
    use warnings;
    use strict;
    use Time::HiRes qw/time/;
    use Redis;
    
    my $count = 100000;
    {
        my $r = Redis->new;
        my $start = time;
        for(1..$count) {
            $r->set('hoge', 'fuga', sub{});
        }
        $r->wait_all_responses;
        printf "Redis.pm:\n%.2f/s\n", $count / (time - $start);
    }
    
    {
        my $r = Redis::Fast->new;
        my $start = time;
        for(1..$count) {
            $r->set('hoge', 'fuga', sub{});
        }
        $r->wait_all_responses;
        printf "Redis::Fast:\n%.2f/s\n", $count / (time - $start);
    }


Redis::Fast is 4x faster than Redis.pm in pipeline mode.

    Redis.pm:
    22588.95/s
    Redis::Fast:
    81098.01/s

=head1 AUTHOR

Ichinose Shogo E<lt>shogo82148@gmail.comE<gt>

=head1 SEE ALSO

=over 4

=item *

L<Redis.pm|https://github.com/melo/perl-redis>

=back

=cut