The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package RedisDB::Cluster;

use strict;
use warnings;
our $VERSION = "2.43";
$VERSION = eval $VERSION;

use Carp;
use RedisDB;
use Time::HiRes qw(usleep);

our $DEBUG = 0;

# use util/generate_key_positions.pl to generate this
# command / first key position
my %key_pos = (
    append           => 1,
    bitcount         => 1,
    bitop            => 2,
    bitpos           => 1,
    blpop            => 1,
    brpop            => 1,
    brpoplpush       => 1,
    decr             => 1,
    decrby           => 1,
    del              => 1,
    dump             => 1,
    exists           => 1,
    expire           => 1,
    expireat         => 1,
    get              => 1,
    getbit           => 1,
    getrange         => 1,
    getset           => 1,
    hdel             => 1,
    hexists          => 1,
    hget             => 1,
    hgetall          => 1,
    hincrby          => 1,
    hincrbyfloat     => 1,
    hkeys            => 1,
    hlen             => 1,
    hmget            => 1,
    hmset            => 1,
    hscan            => 1,
    hset             => 1,
    hsetnx           => 1,
    hvals            => 1,
    incr             => 1,
    incrby           => 1,
    incrbyfloat      => 1,
    lindex           => 1,
    linsert          => 1,
    llen             => 1,
    lpop             => 1,
    lpush            => 1,
    lpushx           => 1,
    lrange           => 1,
    lrem             => 1,
    lset             => 1,
    ltrim            => 1,
    mget             => 1,
    move             => 1,
    mset             => 1,
    msetnx           => 1,
    object           => 2,
    persist          => 1,
    pexpire          => 1,
    pexpireat        => 1,
    pfadd            => 1,
    pfcount          => 1,
    pfmerge          => 1,
    psetex           => 1,
    pttl             => 1,
    rename           => 1,
    renamenx         => 1,
    restore          => 1,
    'restore-asking' => 1,
    rpop             => 1,
    rpoplpush        => 1,
    rpush            => 1,
    rpushx           => 1,
    sadd             => 1,
    scard            => 1,
    sdiff            => 1,
    sdiffstore       => 1,
    set              => 1,
    setbit           => 1,
    setex            => 1,
    setnx            => 1,
    setrange         => 1,
    sinter           => 1,
    sinterstore      => 1,
    sismember        => 1,
    smembers         => 1,
    smove            => 1,
    sort             => 1,
    spop             => 1,
    srandmember      => 1,
    srem             => 1,
    sscan            => 1,
    strlen           => 1,
    substr           => 1,
    sunion           => 1,
    sunionstore      => 1,
    ttl              => 1,
    type             => 1,
    watch            => 1,
    zadd             => 1,
    zcard            => 1,
    zcount           => 1,
    zincrby          => 1,
    zlexcount        => 1,
    zrange           => 1,
    zrangebylex      => 1,
    zrangebyscore    => 1,
    zrank            => 1,
    zrem             => 1,
    zremrangebylex   => 1,
    zremrangebyrank  => 1,
    zremrangebyscore => 1,
    zrevrange        => 1,
    zrevrangebylex   => 1,
    zrevrangebyscore => 1,
    zrevrank         => 1,
    zscan            => 1,
    zscore           => 1,
);

=head1 NAME

RedisDB::Cluster - client for redis cluster

=head1 SYNOPSIS

    my $cluster = RedisDB::Cluster->new( startup_nodes => \@nodes );
    $cluster->set( 'foo', 'bar' );
    my $res = $cluster->get('foo');

=head1 DESCRIPTION

This module allows you to access redis cluster.

=head1 METHODS

=cut

=head2 $self->new(startup_nodes => \@nodes)

create a new connection to cluster. Startup nodes are used to retrieve
information about all cluster nodes and slots mappings.

=cut

sub new {
    my ( $class, %params ) = @_;

    my $self = {
        _slots       => [],
        _connections => {},
        _nodes       => $params{startup_nodes},
    };
    $self->{no_slots_initialization} = 1 if $params{no_slots_initialization};

    bless $self, $class;
    $self->_initialize_slots;

    return $self;
}

sub _initialize_slots {
    my $self = shift;

    return if $self->{no_slots_initialization};
    unless ( $self->{_nodes} and @{ $self->{_nodes} } ) {
        confess "list of cluster nodes is empty";
    }

    my %new_nodes;
    my $new_nodes;
    for my $node ( @{ $self->{_nodes} } ) {
        my $redis = _connect_to_node( $self, $node );
        next unless $redis;

        my $nodes = $redis->cluster_nodes;
        next if ref $nodes =~ /^RedisDB::Error/;
        $new_nodes = $nodes;
        for (@$nodes) {
            $new_nodes{"$_->{host}:$_->{port}"}++;
        }

        my $slots = $redis->cluster('SLOTS');
        confess "got an error trying retrieve a list of cluster slots: $slots"
          if ref $slots =~ /^RedisDB::Error/;
        for (@$slots) {
            my ( $ip, $port ) = @{ $_->[2] };
            my $node_key = "$ip:$port";
            for ( $_->[0] .. $_->[1] ) {
                $self->{_slots}[$_] = $node_key;
            }
        }
        last;
    }

    unless ( $new_nodes and @$new_nodes ) {
        confess "couldn't get list of cluster nodes";
    }
    $self->{_nodes} = $new_nodes;

    # close connections to nodes that are not in cluster
    for ( keys %{ $self->{_connections} } ) {
        delete $self->{_connections}{$_} unless $new_nodes{$_};
    }

    return;
}

=head2 $self->execute($command, @args)

sends command to redis and returns the reply. It determines the cluster node to
send command to from the first key in I<@args>, sending commands that does not
include key as an argument is not supported. If I<@args> contains several keys,
all of them should belong to the same slot, otherwise redis-server will return
an error if some of the keys are stored on a different node.

Module also defines wrapper methods with names matching corresponding redis
commands, so you can use

    $cluster->set( "foo", "bar" );
    $cluster->inc("baz");

instead of

    $cluster->execute( "set", "foo", "bar" );
    $cluster->execute( "inc", "baz" );

=cut

sub execute {
    my $self = shift;
    my @args = @_;

    my $command = lc $args[0];
    confess "Command $command does not have key" unless $key_pos{$command};
    my $key = $args[ $key_pos{$command} ];
    confess "Key is not specified in: ", join " ", @args unless length $key;

    if ( $self->{_refresh_slots} ) {
        $self->_initialize_slots;
    }
    my $slot     = key_slot($key);
    my $node_key = $self->{_slots}[$slot]
      || "$self->{_nodes}[0]{host}:$self->{_nodes}[0]{port}";
    my $asking;
    my $last_connection;

    my $attempts = 10;
    while ( $attempts-- ) {
        my $redis = $self->{_connections}{$node_key};
        unless ($redis) {
            my ( $host, $port ) = split /:/, $node_key;
            $redis = _connect_to_node(
                $self,
                {
                    host => $host,
                    port => $port
                }
            );
        }

        my $res;
        if ($redis) {
            $redis->asking(RedisDB::IGNORE_REPLY) if $asking;
            $asking = 0;
            $res    = $redis->execute(@args);
        }
        else {
            $res = RedisDB::Error::DISCONNECTED->new(
                "Couldn't connect to redis server at $node_key");
        }

        if ( ref $res eq 'RedisDB::Error::MOVED' ) {
            if ( $res->{slot} ne $slot ) {
                confess
                  "Incorrectly computed slot for key '$key', ours $slot, theirs $res->{slot}";
            }
            warn "slot $slot moved to $res->{host}:$res->{port}" if $DEBUG;
            $node_key = $self->{_slots}[$slot] = "$res->{host}:$res->{port}";
            $self->{_refresh_slots} = 1;
            next;
        }
        elsif ( ref $res eq 'RedisDB::Error::ASK' ) {
            warn "asking $res->{host}:$res->{port} about slot $slot" if $DEBUG;
            $node_key = "$res->{host}:$res->{port}";
            $asking   = 1;
            next;
        }
        elsif ( ref $res eq 'RedisDB::Error::DISCONNECTED' ) {
            warn "$res" if $DEBUG;
            delete $self->{_connections}{$node_key};
            usleep 100_000;
            if ( $last_connection and $last_connection eq $node_key ) {

                # if we couldn't reconnect to host, then refresh slots table
                warn "refreshing slots table" if $DEBUG;
                $self->_initialize_slots;

                # if it's still the same host, then just return the error
                return $res if $self->{_slots}[$slot] eq $node_key;
                warn "got a new host for the slot" if $DEBUG;
            }
            else {
                warn "trying to reconnect" if $DEBUG;
                $last_connection = $node_key;
            }
            next;
        }
        return $res;
    }

    return RedisDB::Error::DISCONNECTED->new(
        "Couldn't send command after 10 attempts");
}

for my $command (keys %key_pos) {
    no strict 'refs';
    *{ __PACKAGE__ . "::$command" } = sub { execute(shift, $command, @_) };
}

=head2 $self->random_connection

return RedisDB object that is connected to some node of the cluster. Note, that
in most cases this method will return the same connection every time.

=cut

sub random_connection {
    my $self = shift;
    my ($connection) = values %{ $self->{_connections} };
    unless ($connection) {
        for ( @{ $self->{_nodes} } ) {
            $connection = _connect_to_node( $self, $_ );
            last if $connection;
        }
    }
    return $connection;
}

=head1 CLUSTER MANAGEMENT METHODS

The following methods can be used for cluster management -- to add or remove a
node, or migrate slot from one node to another.

=cut

=head2 $self->add_new_node($address[, $master_id])

attach node with the specified I<$address> to the cluster. If I<$master_id> is
specified, the new node is configured as a replica of the master with the
specified ID, otherwise it will be a master node itself. Address should be
specified as a hash containing I<host> and I<port> elements.

=cut

sub add_new_node {
    my ( $self, $addr, $master_id ) = @_;
    $addr = _ensure_hash_address($addr);

    my $redis = _connect_to_node( $self, $addr );
    my $ok;
    for my $node ( @{ $self->{_nodes} } ) {
        $redis->cluster( 'MEET', $node->{host}, $node->{port},
            sub { $ok++ if not ref $_[1] and $_[1] eq 'OK'; warn $_[1] if ref $_[1]; }
        );
    }
    $redis->mainloop;
    croak "failed to attach node to cluster" unless $ok;

    if ($master_id) {
        my $attempt = 0;
        my $nodes   = $redis->cluster_nodes;
        while ( not grep { $_->{node_id} eq $master_id } @$nodes ) {
            croak "failed to start replication from $master_id - node is not present"
              if $attempt++ >= 10;
            usleep 100_000 * $attempt;
            $nodes = $redis->cluster_nodes;
        }
        my $res = $redis->cluster( 'REPLICATE', $master_id );
        croak $res if ref $res =~ /^RedisDB::Error/;
    }

    return 'OK';
}

=head2 $self->migrate_slot($slod, $destination_node)

migrates specified slot to the given I<$destination_node> from the current node
responsible for this slot. Destinations node should be specified as a hash
containing I<host> and I<port> elements. For details check "Cluster live
reconfiguration" section in the L<Redis Cluster
Specification|http://redis.io/topics/cluster-spec>.

=cut

sub migrate_slot {
    my ( $self, $slot, $dst ) = @_;

    # make sure we have up to date information about slots mapping
    $self->_initialize_slots;
    my $src_key = $self->{_slots}[$slot];
    confess "mapping for slot $slot is not defined" unless $src_key;

    # destination node should be part of the cluster
    $dst = $self->_get_node_info($dst)
      or confess "destination node is seems not a part of the cluster";
    my $dst_key = "$dst->{host}:$dst->{port}";
    warn "migrating slot $slot from $src_key to $dst_key" if $DEBUG;

    # if slot is already on destination node, just return
    return if $src_key eq $dst_key;
    my $src = $self->_get_node_info($src_key);

    my $dst_redis = _connect_to_node( $self, $dst )
      or confess "couldn't connect to destination node";
    my $src_redis = _connect_to_node( $self, $src )
      or confess "couldn't connect to source node";

    # set importing/migrating state for the slot
    my $res =
      $dst_redis->cluster( 'setslot', $slot, 'importing', $src->{node_id} );
    confess "$res" unless "$res" eq 'OK';
    $res =
      $src_redis->cluster( 'setslot', $slot, 'migrating', $dst->{node_id} );
    confess "$res" unless "$res" eq 'OK';
    warn "set slots on dst/src nodes to importing/migrating state" if $DEBUG;

    # migrate all keys from src to dst
    my $migrated = 0;
    while (1) {
        my $keys = $src_redis->cluster( 'getkeysinslot', $slot, 1000 );
        confess "Migration failed: $keys" if ref $keys =~ /^RedisDB::Error/;
        last unless @$keys;
        for (@$keys) {
            $res = $src_redis->migrate( $dst->{host}, $dst->{port}, $_, 0, 60 );
            confess "Migration failed: $res" unless "$res" eq 'OK';
            $migrated++;
        }
    }
    warn "migrated $migrated keys from the slot" if $DEBUG;

    $res = $dst_redis->cluster( 'setslot', $slot, 'node', $dst->{node_id} );
    confess "$res" unless "$res" eq 'OK';
    $res = $src_redis->cluster( 'setslot', $slot, 'node', $src->{node_id} );
    confess "$res" unless "$res" eq 'OK';
    warn "migration is finished" if $DEBUG;

    return 1;
}

=head2 $self->remove_node($node)

removes node from the cluster. If the node is a slave, it simply shuts the node
down and sends CLUSTER FORGET command to all other cluster nodes. If the node
is a master node, the method first migrates all slots from it to other nodes.

=cut

sub remove_node {
    my ( $self, $node ) = @_;

    $self->_initialize_slots;
    $node = $self->_get_node_info($node);
    my $node_key = "$node->{host}:$node->{port}";
    if ( $node->{flags}{master} ) {
        my @masters;
        my @slaves;
        for ( @{ $self->{_nodes} } ) {
            if ( $_->{flags}{slave} ) {
                push @slaves, $_ if $_->{master_id} eq $node->{node_id};
                next;
            }
            next if $_->{node_id} eq $node->{node_id};
            push @masters, $_;
        }
        my @slots;
        my %slots_at;
        for my $i ( 0 .. 16383 ) {
            push @slots, $i if $self->{_slots}[$i] eq $node_key;
            $slots_at{ $self->{_slots}[$i] }++;
        }
        if ($DEBUG) {
            warn "Node to remove is a master with "
              . scalar(@slaves)
              . "\nIt holds "
              . scalar(@slots)
              . " slots."
              . "\nThere are "
              . scalar(@masters)
              . " other masters in cluster\n";
        }
        my $slots_per_master  = int( 16384 / @masters + 1 );
        my $slaves_per_master = int( @slaves / @masters + 1 );
        for my $master (@masters) {
            my $key = "$master->{host}:$master->{port}";
            for ( $slots_at{$key} + 1 .. $slots_per_master ) {
                my $slot = shift @slots;
                last unless defined $slot;
                $self->migrate_slot( $slot, $master );
            }
            for ( 1 .. $slaves_per_master ) {
                my $slave = shift @slaves or last;
                my $redis = $self->_connect_to_node($slave) or next;
                my $res = $redis->cluster( 'replicate', $master->{node_id} );
                warn "Failed to reconfigure slave $slave->{host}:$slave->{port}"
                  . " to replicate from $master->{node_id}: $res"
                  if ref $res =~ /^RedisDB::Error/;
            }
        }
    }

    my $redis = delete $self->{_connections}{$node_key};
    $redis->shutdown;
    my @nodes;
    for ( @{ $self->{_nodes} } ) {
        next if $_->{node_id} eq $node->{node_id};
        push @nodes, $_;
        my $redis = $self->_connect_to_node($_) or next;
        my $res = $redis->cluster( 'forget', $node->{node_id} );
        warn "$_->{host}:$_->{port} could not forget the node: $res"
          if $res =~ /^RedisDB::Error/;
    }
    $self->{_nodes} = \@nodes;

    return 1;
}

sub _get_node_info {
    my ( $self, $node ) = @_;
    $node = _ensure_hash_address($node);
    for ( @{ $self->{_nodes} } ) {
        return $_ if $node->{host} eq $_->{host} and $node->{port} eq $_->{port};
    }
    return;
}

sub _ensure_hash_address {
    my $addr = shift;
    unless ( ref $addr eq 'HASH' ) {
        my ( $host, $port ) = split /:/, $addr;
        croak "invalid address spec: $addr" unless $host and $port;
        $addr = {
            host => $host,
            port => $port
        };
    }
    return $addr;
}

sub _connect_to_node {
    my ( $self, $node ) = @_;
    my $host_key = "$node->{host}:$node->{port}";
    unless ( $self->{_connections}{$host_key} ) {
        my $redis = RedisDB->new(
            host        => $node->{host},
            port        => $node->{port},
            raise_error => 0,
        );
        $self->{_connections}{$host_key} = $redis->{_socket} ? $redis : undef;
    }
    return $self->{_connections}{$host_key};
}

=head1 SERVICE FUNCTIONS

=cut

my @crc16tab = (
    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
    0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
    0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
    0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
    0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
    0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
    0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
    0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
    0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
    0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
    0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
    0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
    0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
    0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
    0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
    0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
    0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
    0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
    0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
    0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
    0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
    0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
    0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
    0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
    0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
    0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
    0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
    0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
    0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
    0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
    0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
    0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
);

=head2 crc16($buf)

compute crc16 for the specified buffer as defined in redis cluster
specification

=cut

sub crc16 {
    my $buf = shift;
    if ( utf8::is_utf8($buf) ) {
        die "Can't compute crc16 for string with wide characters.\n"
          . "You should encode strings you pass to redis as bytes";
    }
    my $crc = 0;
    for ( split //, $buf ) {
        $crc =
          ( $crc << 8 & 0xFF00 ) ^ $crc16tab[ ( ( $crc >> 8 ) ^ ord ) & 0x00FF ];
    }
    return $crc;
}

=head2 key_slot($key)

return slot number for the given I<$key>

=cut

sub key_slot {
    my $key = shift;

    if ( $key =~ /\{([^}]+)\}/ ) {
        $key = $1;
    }

    return crc16($key) & 16383;
}

1;

__END__

=head1 AUTHOR

Pavel Shaydo, C<< <zwon at cpan.org> >>

=head1 LICENSE AND COPYRIGHT

Copyright 2011-2015 Pavel Shaydo.

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.

=cut