The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Danga::Socket::Redis;
use strict;
use IO::Socket;
use Danga::Socket::Callback;

=head1 NAME

Danga::Socket::Redis - An asynchronous redis client.

=head1 SYNOPSIS

    use Danga::Socket::Redis;

    my $rs = Danga::Socket::Redis->new ( connected => \&redis_connected );
 
    sub redis_connected {
        $rs->set ( "key", "value" );
        $rs->get ( "key", sub { my ( $self, $value ) = @_; print "$key = $value\n" } );
        $rs->publish ( "newsfeed", "Twitter is down" );
        $rs->hset ( "hkey", "field", "value" );
        $rs->hget ( "hkey", "field", sub { my ( $self, $value ) = @_ } );
        $rs->subscribe ( "newsfeed", sub { my ( $self, $msg ) = @_ } );
    }
    
Danga::Socket->EventLoop;


=head1 DESCRIPTION

An asynchronous client for the key/value store redis. Asynchronous
basically means a method does not block. A supplied callback will be
called with the results when they are ready.

=head1 USAGE



=head1 BUGS

Only started, a lot of redis functions need to be added.


=head1 SUPPORT

dm @martinredmond
martin @ tinychat.com

=head1 AUTHOR

    Martin Redmond
    CPAN ID: REDS
    Tinychat.com
    @martinredmond
    http://Tinychat.com/about.php

=head1 COPYRIGHT

This program is free software; you can redistribute
it and/or modify it under the same terms as Perl itself.

The full text of the license can be found in the
LICENSE file included with this module.


=head1 SEE ALSO

perl(1).

=cut


BEGIN {
    use Exporter ();
    use vars qw($VERSION @ISA @EXPORT @EXPORT_OK %EXPORT_TAGS);
    $VERSION     = '0.06';
    @ISA         = qw(Exporter);
    @EXPORT      = qw();
    @EXPORT_OK   = qw(set get 
		      hset hget
		      publish subscribe);
    %EXPORT_TAGS = ();
}

our $AUTOLOAD;

our %cmds = (
	     exists => { args => 1 },
	     del => { args => 1 },
	     type => { args => 1 },
	     keys => { args => 1 },
	     randomkey => { args => 0 },
	     rename => { args => 2 },
	     renamenx => { args => 2 },
	     dbsize => { args => 0 },
	     expire => { args => 2 },
	     ttl => { args => 2 },
	     select => { args => 1 },
	     move => { args => 2 },
	     flushdb => { args => 0 },
	     flushall => { args => 0 },

	     set => { args => 2 }, 
	     get => { args => 1 }, 
	     getset => { args => 2 }, 
	     mget => { margs => 1 }, 
	     setnx => { args => 2 }, 
	     setex => { args => 3 }, 
	     mset => { margs => 1 }, 
	     msetnx => { margs => 1 }, 
	     incr => { args => 1 }, 
	     incrby => { args => 1 }, 
	     decr => { args => 1 }, 
	     decrby => { args => 1 }, 
	     append => { args => 2 }, 
	     substr => { args => 3 }, 

	     rpush => { args => 2 }, 
	     lpush => { args => 2 }, 
	     llen => { args => 1 }, 
	     lrange => { args => 2 }, 
	     ltrim => { args => 3 }, 
	     lindex => { args => 2 }, 
	     lset => { args => 3 }, 
	     lrem => { args => 3 }, 
	     lpop => { args => 1 }, 
	     rpop => { args => 1 }, 
	     blpop => { margs => 1 }, 
	     brpop => { margs => 1 }, 
	     rpoplpush => { args => 2 }, 

	     sadd => { args => 2 }, 
	     srem => { args => 2 }, 
	     spop => { args => 1 }, 
	     smove => { args => 3 }, 
	     scard => { args => 1 }, 
	     sismember => { args => 2 }, 
	     sinter => { margs => 1 }, 
	     sinterstore => { margs => 1 }, 
	     sunion => { margs => 1 }, 
	     sunionstore => { margs => 1 }, 
	     sdiff => { margs => 1 },

	     smembers => { args => 1 }, 
	     srandmember => { args => 1 }, 
	     sdiffstore => { margs => 1 }, 
	     
	     zadd => { args => 3 }, 
	     zrem => { args => 2 }, 
	     zincrby => { args => 3 }, 
	     zrank => { args => 2 }, 
	     zrevrank => { args => 2 }, 
	     zrange => { args => 3 }, 
	     zrevrange => { args => 3 }, 
	     zrangebyscore => { args => 3 }, 
	     zcount => { args => 4 }, 
	     zcard => { args => 1 }, 
	     zscore => { args => 0 }, 
	     zremrangebyrank => { args => 0 }, 
	     zremrangebyscore => { args => 0 }, 
	     zunionstore => { args => 0 }, 

	     hset => { args => 3 }, 
	     hget => { args => 2 }, 
	     hmget => { margs => 1 }, 
	     hmset => { margs => 1 }, 
	     hincrby => { args => 0 }, 
	     hexists => { args => 2 }, 
	     hdel => { args => 2 }, 
	     hlen => { args => 1 }, 
	     hkeys => { args => 1 }, 
	     hvals => { args => 1 }, 
	     hgetall => { args => 1 }, 

	     subscribe => { args => 1 },
	     unsubscribe => { args => 1 },
	     publish => { args => 2 },

	     # * MULTI/EXEC/DISCARD/WATCH/UNWATCH  Redis atomic transactions 
	     sort => { args => 0 }, 
	     save => { args => 0 }, 
	     bgsave => { args => 0 }, 
	     lastsave => { args => 0 }, 
	     shutdown => { args => 0 }, 
	     bgrewriteaof => { args => 0 }, 

	     info => { args => 0 }, 
	     monitor => { args => 0 }, 
	     slaveof => { args => 0 }, 
	     config => { args => 0 }, 
	    );

1;

sub new {
  my ($class, %args) = @_;
  my $self = bless ({}, ref ($class) || $class);
  my $peeraddr = "localhost:6379";
  $peeraddr = "$args{host}:6379" if $args{host};
  $peeraddr = "localhost:$args{port}" if $args{port};
  $peeraddr = "$args{host}:$args{port}" if $args{host} && $args{port};
  my $sock = IO::Socket::INET->new (
				    PeerAddr => $peeraddr,
				    Blocking => 0,
				   );
  $self->{connected_cb} = $args{connected} if $args{connected};
  my $a = '';
  $self->{rs} = Danga::Socket::Callback->new
    (
     handle => $sock,
     context => { buf => \$a, rs => $self },
     on_read_ready => sub {
       my $self = shift;
       my $bref = $self->read ( 1024 * 8 );
       my $buf = $self->{context}->{buf};
       if ( $bref ) {
	 $buf = length ( $$buf ) > 0 ? 
	   \ ($$buf . $$bref) :
	     $bref;
	 $self->{context}->{buf} = $self->{context}->{rs}->do_buf ( $buf );
       } else {
	 $self->close ( 'read' );
	 die "reading from redis";
       }
     },
     on_write_ready => sub {
       my $self = shift;
       $self->watch_write ( 0 );
       my $cb = delete $self->{context}->{rs}->{connected_cb};
       &$cb ( $self->{context}->{rs} ) if $cb;
     }
    );
  return bless $self;
}

sub do_buf {
  my ( $self, $buf ) = @_;
  my $o;
  while ( 1 ) {
    ( $buf, $o ) =
      $self->redis_read ( $buf );
    last unless $o;
    $self->redis_process ( $o );
  }
  return $buf;  # there may be some stuff left over from this read
}

sub redis_read {
  my ( $self, $bref ) = @_;
  return ( $bref, undef ) if length ( $$bref ) == 0;
  my $nlpos = index ( $$bref, "\n" );
  return ( $bref, undef ) if $nlpos == -1;
  my $tok = substr ( $$bref, 0, 1 );
  if ( $tok eq ':' ) {
    my $n = substr ( $$bref, 1, $nlpos - 2 );
    my $r = substr ( $$bref, $nlpos + 1 );
    return ( \$r, { type => 'int', value => $n } );
  } elsif ( $tok eq '-' ) {
    my $e = substr ( $$bref, 1, $nlpos - 2 );
    my $r = substr ( $$bref, $nlpos + 1 );
    return ( \$r, { type => 'error', value => $e } );
  } elsif ( $tok eq '+' ) {
    my $l = substr ( $$bref, 1, $nlpos - 2 );
    my $r = substr ( $$bref, $nlpos + 1 );
    return ( \$r, { type => 'line', value => $l } );
  } elsif ( $tok eq '$' ) {
    my $l = substr ( $$bref, 1, $nlpos - 2 );
    if ( $l == -1 ) {
      my $r = substr ( $$bref, $nlpos + 1 );
      return ( \$r, { type => 'bulkerror' } );
    }
    #	    warn "better check this" if length ( $$bref ) < $nlpos + 1 + $l + 2;
    return ( $bref, undef ) if length ( $$bref ) < $nlpos + 1 + $l + 2;  # need more data
    my $v = substr ( $$bref, $nlpos + 1, $l );
    my $r = substr ( $$bref, $nlpos + $l + 1 + 2 );
    return ( \$r, { type => 'bulk', value => $v } );
  } elsif ( $tok eq '*' ) {
    my $l = substr ( $$bref, 1, $nlpos - 2 );
    if ( $l == -1 ) {
      my $r = substr ( $$bref, $nlpos + 1 );
      return ( \$r, { type => 'multibulkerror' } );
    }
    my $obref = $bref;
    my $r = substr ( $$bref, $nlpos + 1 );
    $bref = \$r;
    my @res;
    while ( $l-- ) {
      my $o;
      ( $bref, $o ) = $self->redis_read ( $bref );
      return $obref unless $o;    # read more?
      push @res, $o;
    }
    return ( $bref, { type => 'bulkmulti', values => \@res } );
  } else {
    die "Danga::Socket::Redis bref", $$bref;
  }
}

sub redis_process {
    my ( $self, $o ) = @_;
    my $v = $o->{values};
    if ( $v && $v->[0]->{value} eq 'message' ) {
      if ( my $cb = $self->{subscribe}->{callback}->{$v->[1]->{value}} ) {
	&$cb ( $self, $v->[2]->{value}, $o );
      }
      return;
    }
    my $cmd = shift @{$self->{cmdqueue}};
    if ( my $cb = $cmd->{callback} ) {
	if ( $o->{type} eq 'bulkerror' ) {
	    &$cb ( $self, $o );
	} else {
	    if ( $o->{type} eq 'bulkmulti' ) {
		my  @vs = map { $_->{value} } @{$o->{values}};
		&$cb ( $self, \@vs, $o );
	    } else {
		&$cb ( $self, $o->{value}, $o );
	    }
	}
    }
}

sub DESTROY {}

sub AUTOLOAD {
  my $self = shift;
  my $cc = $AUTOLOAD;
  $cc =~ s/.*:://;
  $cc = lc $cc;

  my $opts = $Danga::Socket::Redis::cmds{$cc};
  return undef unless $opts;

  my $cmd = { type => $cc };
  if ( $opts->{args} > 0 ) {
    push @{$cmd->{args}}, shift for 1 .. $opts->{args};
    $cmd->{callback} = shift;
    $cmd->{options} = shift;
  } elsif ( $opts->{margs} == 1 ) {
    my $last = pop @_;
    if ( ref $last eq 'HASH' ) {
      $cmd->{options} = $last;
      $last = pop @_;
    }
    if ( ref $last eq 'CODE' ) {
      $cmd->{callback} = $last;
    } else {
      push @_, $last;
    }
    @{$cmd->{args}} = @_;
  }
  if ( $cc eq 'subscribe' && $cmd->{callback} && $cmd->{args} &&
       scalar @{$cmd->{args}} == 1 ) {
    $self->{subscribe}->{callback}->{$cmd->{args}->[0]} = $cmd->{callback};
  }
  $self->redis_send ( $cmd );
}

sub redis_send {
  my ( $self, $cmd ) = @_;
  $cmd->{args} = [] unless ref $cmd->{args} eq 'ARRAY';
  unless ( $cmd->{type} eq 'subscribe' ) {
    push @{$self->{cmdqueue}}, $cmd;
  }
  my $send = "*" . ( scalar ( @{$cmd->{args}} ) + 1 ) . "\r\n" .
    "\$" . length ( $cmd->{type} ) . "\r\n" .
      $cmd->{type} . "\r\n";
  foreach ( @{$cmd->{args}} ) {
    $send .= "\$" . length ($_) . "\r\n$_\r\n";
  }
  $self->{rs}->write ( $send );
}