The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Redis::RdbParser;

use 5.008008;
use strict;
use warnings;

require Exporter;

our @ISA = qw(Exporter);

our $VERSION = '0.02';

use Carp;

use constant REDIS_RDB_6BITLEN              => 0;
use constant REDIS_RDB_14BITLEN             => 1;
use constant REDIS_RDB_32BITLEN             => 2;
use constant REDIS_RDB_ENCVAL               => 3;
use constant REDIS_RDB_OPCODE_EXPIRETIME_MS => 252;
use constant REDIS_RDB_OPCODE_EXPIRETIME    => 253;
use constant REDIS_RDB_OPCODE_SELECTDB      => 254;
use constant REDIS_RDB_OPCODE_EOF           => 255;
use constant REDIS_RDB_TYPE_STRING          => 0;
use constant REDIS_RDB_TYPE_LIST            => 1;
use constant REDIS_RDB_TYPE_SET             => 2;
use constant REDIS_RDB_TYPE_ZSET            => 3;
use constant REDIS_RDB_TYPE_HASH            => 4;
use constant REDIS_RDB_TYPE_HASH_ZIPMAP     => 9;
use constant REDIS_RDB_TYPE_LIST_ZIPLIST    => 10;
use constant REDIS_RDB_TYPE_SET_INTSET      => 11;
use constant REDIS_RDB_TYPE_ZSET_ZIPLIST    => 12;
use constant REDIS_RDB_TYPE_HASH_ZIPLIST    => 13;
use constant REDIS_RDB_ENC_INT8             => 0;
use constant REDIS_RDB_ENC_INT16            => 1;
use constant REDIS_RDB_ENC_INT32            => 2;
use constant REDIS_RDB_ENC_LZF              => 3;

my %DATA_TYPE_MAPPING = (
    0   => 'string',
    1   => 'list',
    2   => 'set',
    3   => 'sortedset',
    4   => 'hash',
    9   => 'hash',
    10  => 'list',
    11  => 'set',
    12  => 'sortedset',
    13  => 'hash');

my %def_callbacks = (
    "start_rdb"         => \&def_start_rdb,
    "start_database"    => \&def_start_database,
    "key"               => \&def_key,
    "set"               => \&def_set,
    "start_hash"        => \&def_start_hash,
    "hset"              => \&def_hset,
    "end_hash"          => \&def_end_hash,
    "start_set"         => \&def_start_set,
    "sadd"              => \&def_sadd,
    "end_set"           => \&def_end_set,
    "start_list"        => \&def_start_list,
    "rpush"             => \&def_rpush,
    "end_list"          => \&def_end_list,
    "start_sorted_set"  => \&def_start_sorted_set,
    "zadd"              => \&def_zadd,
    "end_sorted_set"    => \&def_end_sorted_set,
    "end_database"      => \&def_end_database,
    "end_rdb"           => \&def_end_rdb,
);

sub new {
    my ($class, $callbacks) = @_;
    $callbacks ||= \%def_callbacks;

    my $self = bless {}, $class;
    $self->{callback} = $callbacks;
    $self->{expiry} = undef;
    $self->{key} = undef;
    $self->{filename} = undef;
    $self;
}

#=================================================================
# Parse a redis rdb dump file, and call methods in the callback
# hash reference during the parsing operation.
#
# filter's structure is a hash, whose member value is of array type,
# the key is processed as regular expression.
# i.e.
#   filter = {
#       "dbs" => [0, 1],
#       "keys" => ["^foo$", "bar"],
#       "types" => ["hash", "set", "sortedset", "list", "string"],
#   }
#   
#   If filter is undef, results will not be filtered.
#   If dbs, keys or type is undef, no filtering will be done on the
#   result.
#   You can also use appropriate callback to filter keys.
#   All the 3 conditions must be satified.
#=================================================================
sub parse {
    my $self = shift;
    my $filename = shift;
    my $filter = shift;

    unless (defined($filename)) {
        croak "Expected a Redis dump file name";
    }
    $self->{filename} = $filename;

    my $buffer;
    open my $INFH, $filename or 
        croak "Open $filename for reading failed: $!";
    binmode $INFH;

    read($INFH, $buffer, 5) or croak "Read $filename failed: $!";
    $self->verify_magic($buffer);

    read($INFH, $buffer, 4) or croak "Read $filename failed: $!";
    $self->verify_version($buffer);

    $self->invoke_callback("start_rdb", $filename);

    my $db_number = 0;
    my $is_first_database = 1;

    while (1) {
        $self->{expiry} = undef;
        my $data_type = &read_unsigned_char($INFH);

        if ($data_type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            $self->{expiry} = &read_unsigned_long($INFH);
            $data_type = &read_unsigned_char($INFH);
        } elsif ($data_type == REDIS_RDB_OPCODE_EXPIRETIME) {
            $self->{expiry} = &read_unsigned_int($INFH) * 1000; # change to ms
            $data_type = &read_unsigned_char($INFH);
        } 

        if ($data_type == REDIS_RDB_OPCODE_SELECTDB) {
            unless ($is_first_database) {
                $self->invoke_callback("end_database", $db_number);
            }
            $is_first_database = 0;
            $db_number = &read_length($INFH);

            $self->invoke_callback("start_database", $db_number);
            next;
        }

        if ($data_type == REDIS_RDB_OPCODE_EOF) {
            unless ($is_first_database) {
                $self->invoke_callback("end_database", $db_number);
            }
            $self->invoke_callback("end_rdb", $filename);
            last;
        }

        if (defined($filter)) {
            if (&match_db($filter, $db_number)) {
                my $key = &read_string($INFH);
                $self->{key} = $key;
                $self->invoke_callback("key", $key);
                if (match_filter($filter, $key, $data_type)) {
                    $self->read_object($INFH, $data_type);
                } else {
                    &skip_object($INFH, $data_type);
                }
            } else {
                &skip_key_and_object($INFH, $data_type);
            }
        } else {
            my $key = &read_string($INFH);
            $self->{key} = $key;
            $self->invoke_callback("key", $key);
            $self->read_object($INFH, $data_type);
        }
    }
    
    close $INFH or croak "Close $filename failed: $!";
}

sub match_db {
    my ($filter, $db_number) = @_;
    return 1 unless (defined($filter->{dbs}));
    if (&in_array($db_number, @{$filter->{dbs}})) {
        return 1;
    }
    return 0;
}

sub match_filter {
    my ($filter, $key, $data_type) = @_;
    my $ret1 = 0;
    my $ret2 = 0;
    
    if (defined($filter->{"keys"})) {
        foreach (@{$filter->{"keys"}}) {
            if ($key =~ m/$_/) {
                $ret1 = 1;
                last;
            }
        }
    } else {
        $ret1 = 1;
    }

    if (defined($filter->{"types"})) {
        foreach (@{$filter->{"types"}}) {
            if ($DATA_TYPE_MAPPING{$data_type} eq $_) {
                $ret2 = 1;
                last;
            }
        }
    } else {
        $ret2 = 1;
    }

    return $ret1 & $ret2;
}

sub in_array {
    my ($needle, @haystack) = @_;
    foreach (@haystack) {
        if ($needle == $_) {
            return 1;
        }
    }
    return 0;
}

sub invoke_callback {
    my $self = shift;
    my $method = shift;
    my @args = @_;

    if (defined($self->{callback}->{$method})) {
        my $func = $self->{callback}->{$method};
        &$func(@args);
    }
}

sub verify_magic {
    my ($self, $magic) = @_;

    if ($magic ne 'REDIS') {
        croak "Invalid File Format for file " . $self->{filename};
    }
}

sub verify_version {
    my ($self, $version) = @_;

    $version = int($version);

    if ($version < 1 or $version > 6) {
        croak "Invalid RDB version $version for file " . $self->{filename};
    }
}

sub read_signed_char {
    my ($fh) = @_;
    my $buffer;

    read($fh, $buffer, 1) or croak "read failed: $!";
    return unpack('c', $buffer);
}

sub read_signed_char_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 1);
    return (unpack('c', $temp), $off + 1);
}

sub read_unsigned_char {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 1) or croak "read failed: $!";
    return unpack('C', $buffer);
}

sub read_unsigned_char_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 1);
    return (unpack('C', $temp), $off + 1);
}

sub read_signed_short {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 2) or croak "read failed: $!";
    return unpack('s', $buffer);
}

sub read_signed_short_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 2);
    return (unpack('s', $temp), $off + 2);
}

sub read_unsigned_short {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 2) or croak "read failed: $!";
    return unpack('S', $buffer);
}

sub read_unsigned_short_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 2);
    return (unpack('S', $temp), $off + 2);
}

sub read_signed_int {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 4) or croak "read failed: $!";
    return unpack('i', $buffer);
}

sub read_signed_int_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 4);
    return (unpack('i', $temp), $off + 4);
}

sub read_unsigned_int {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 4) or croak "read failed: $!";
    return unpack('I', $buffer);
}

sub read_unsigned_int_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 4);
    return (unpack('i', $temp), $off + 4);
}

sub read_big_endian_unsigned_int {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 4) or croak "read failed: $!";
    return unpack('N', $buffer);
}

sub read_big_endian_unsigned_int_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 4);
    return (unpack('N', $temp), $off + 4);
}

sub read_24bit_signed_number {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 3) or croak "read failed: $!";
    $buffer .= '0' . $buffer;
    return unpack('i', $buffer) >> 8;
}

sub read_24bit_signed_number_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 3);
    $temp .= '0' . $temp;
    return (unpack('i', $temp) >> 8, $off + 3);
}

sub read_signed_long {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 8) or croak "read failed: $!";
    return unpack('q', $buffer);
}

sub read_signed_long_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 8);
    return (unpack('q', $temp), $off + 8);
}

sub read_unsigned_long {
    my ($fh) = @_;
    my $buffer;
    read($fh, $buffer, 8) or croak "read failed: $!";
    return unpack('Q', $buffer);
}

sub read_unsigned_long_str {
    my ($str, $off) = @_;
    my $temp = substr($str, $off, 8);
    return (unpack('Q', $temp), $off + 8);
}

sub skip {
    my ($fh, $free) = @_; 
    my $dummy;
    if ($free > 0) {
        read($fh, $dummy, $free);
    }
}

sub ntohl {
    my ($fh) = @_;
    my $val = &read_unsigned_int($fh);
    my $new_val = 0;
    $new_val |= (($val & 0x000000ff) << 24);
    $new_val |= (($val & 0xff000000) >> 24);
    $new_val |= (($val & 0x0000ff00) << 8);
    $new_val |= (($val & 0x00ff0000) >> 8);
    return $new_val;
}

sub read_length_with_encoding {
    my ($fh) = @_;
    my $length = 0;
    my $is_encoded = 0;
    my $buffer;
    
    my $first = &read_unsigned_char($fh);
    my $enc_type = ($first & 0xc0) >> 6;

    if ($enc_type == REDIS_RDB_ENCVAL) {
        $is_encoded = 1;
        $length = $first & 0x3f;
    } elsif ($enc_type == REDIS_RDB_6BITLEN) {
        $length = $first & 0x3f;
    } elsif ($enc_type == REDIS_RDB_14BITLEN) {
        my $second = &read_unsigned_char($fh);
        $length = (($first & 0x3f) << 8) | $second;
    } else {
        $length = ntohl($fh);
    }
    return ($length, $is_encoded);
}

sub read_length {
    return (&read_length_with_encoding(@_))[0];
}

sub read_string {
    my ($fh) = @_;
    my ($length, $is_encoded) = &read_length_with_encoding($fh);
    my $val;

    if ($is_encoded) {
        if ($length == REDIS_RDB_ENC_INT8) {
            $val = &read_signed_char($fh);
        } elsif ($length == REDIS_RDB_ENC_INT16) {
            $val = &read_signed_short($fh);
        } elsif ($length == REDIS_RDB_ENC_INT32) {
            $val = &read_signed_int($fh);
        } elsif ($length == REDIS_RDB_ENC_LZF) {
            my $clen = &read_length($fh);
            my $len = &read_length($fh);
            my $buffer;
            read($fh, $buffer, $clen);
            $val = &lzf_decompress($buffer, $len);
        }
    } else {
        read($fh, $val, $length);
    }
    return $val;
}

#==================================================================
# Read an object from the stream, and invoke callbacks.
#==================================================================
sub read_object {
    my ($self, $fh, $enc_type) = @_;
    my $val;
    my $length;

    if ($enc_type == REDIS_RDB_TYPE_STRING) {
        $val = &read_string($fh); 
        $self->invoke_callback("set", $self->{key}, $val,  
            $self->{expiry}, {'encoding' => 'string'});
    } elsif ($enc_type == REDIS_RDB_TYPE_LIST) {
        #=================================================================
        # A redis list is just a sequence of strings
        # We successively read strings from the string and create a list
        # from it.
        # The lists are in order i.e. the first string is the head,
        # and the last string is the tail of the list.
        #=================================================================
        $length = &read_length($fh);
        $self->invoke_callback("start_list", 
            $self->{key}, $length, $self->{expiry}, 
            {'encoding' => 'linkedlist'});
        for (my $i = 0; $i < $length; ++$i) {
            $val = &read_string($fh);
            $self->invoke_callback("rpush", $self->{key}, $val);
        }
        $self->invoke_callback("end_list", $self->{key});
    } elsif ($enc_type == REDIS_RDB_TYPE_SET) {
        #================================================================
        # A redis set is just a sequence of strings.
        # We successively read strings from the stream and create a set
        # from it. Note that the order of strings is non-deterministic.
        #================================================================
        $length = &read_length($fh);
        $self->invoke_callback("start_set",
            $self->{key}, $length, $self->{expiry},
            {'encoding' => 'hashtable'});

        for (my $i = 0; $i < $length; ++$i) {
            $val = &read_string($fh);
            $self->invoke_callback("sadd", $self->{key}, $val);
        }
        $self->invoke_callback("end_set", $self->{key});
    } elsif ($enc_type == REDIS_RDB_TYPE_ZSET) {
        $length = &read_length($fh);
        $self->invoke_callback("start_sorted_set", 
            $self->{key}, $length, $self->{expiry},
            {'encoding' => 'skiplist'});
        for (my $i = 0; $i < $length; ++$i) {
            $val = &read_string($fh);
            my $dbl_length = &read_unsigned_char($fh);
            read($fh, my $score, $dbl_length);
            $self->invoke_callback("zadd", $self->{key}, $score, $val);
        }
        $self->invoke_callback("end_sorted_set", $self->{key});
    } elsif ($enc_type == REDIS_RDB_TYPE_HASH) {
        $length = &read_length($fh);

        $self->invoke_callback("start_hash", 
            $self->{key}, $length, $self->{expiry}, 
            {'encoding' => 'hashtable'});
        for (my $i = 0; $i < $length; ++$i) {
            my $field = &read_string($fh);
            my $value = &read_string($fh);
            $self->invoke_callback("hset", $self->{key}, $field, $value);
        }
        $self->invoke_callback("end_hash", $self->{key});
    } elsif ($enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP) {
        $self->read_zipmap($fh);
    } elsif ($enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST) {
        $self->read_ziplist($fh);
    } elsif ($enc_type == REDIS_RDB_TYPE_SET_INTSET) {
        $self->read_intset($fh);
    } elsif ($enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST) {
        $self->read_zset_from_ziplist($fh);
    } elsif ($enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST) {
        $self->read_hash_from_ziplist($fh);
    } else {
        croak "Invalid object type $enc_type";
    }
}

sub read_intset {
    my ($self, $fh) = @_;
    my $seek = 0;
    my $entry;
    my $encode;
    my $num_entries;

    my $raw_string = &read_string($fh);

    ($encode, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($num_entries, $seek) = &read_unsigned_int_str($raw_string, $seek);

    $self->invoke_callback("start_set", 
        $self->{key}, $num_entries, $self->{expiry}, 
        {'encoding' => 'intset', 'sizeof_value' => length($raw_string)});

    for (my $i = 0; $i < $num_entries; ++$i) {
        if ($encode == 8) {
            ($entry, $seek) = &read_unsigned_long_str($raw_string, $seek);
        } elsif ($encode == 4) {
            ($entry, $seek) = &read_unsigned_int_str($raw_string, $seek);
        } elsif ($encode == 2) {
            ($entry, $seek) = &read_unsigned_short_str($raw_string, $seek);
        } else {
            croak "Invalid encoding $encode";
        }
        $self->invoke_callback("sadd", $self->{key}, $entry);
    }
    $self->invoke_callback("end_set", $self->{key});
}

sub read_ziplist_entry_str {
    my ($str, $off) = @_;
    my ($length, $value, $prev_length, $entry_header);
    
    ($prev_length, $off) = &read_unsigned_char_str($str, $off);
    if ($prev_length == 254) {
        ($prev_length, $off) = &read_unsigned_int_str($str, $off);
    }

    ($entry_header, $off) = &read_unsigned_char_str($str, $off);

    if ($entry_header >> 6 == 0) {
        $length = $entry_header & 0x3f;
        $value = substr($str, $off, $length);
        $off += $length;
    } elsif ($entry_header >> 6 == 1) {
        ($length, $off) = &read_unsigned_char_str($str, $off);
        $length |= (($entry_header & 0x3f) << 8);
        $value = substr($str, $off, $length);
        $off += $length;
    } elsif ($entry_header >> 6 == 2) {
        ($length, $off) = &read_big_endian_unsigned_int_str($str, $off);
        $value = substr($str, $off, $length);
        $off += $length;
    } elsif ($entry_header >> 4 == 12) {
        ($value, $off) = &read_signed_short_str($str, $off);
    } elsif ($entry_header >> 4 == 13) {
        ($value, $off) = &read_signed_int_str($str, $off);
    } elsif ($entry_header >> 4 == 14) {
        ($value, $off) = &read_signed_long_str($str, $off);
    } elsif ($entry_header == 240) {
        ($value, $off) = &read_24bit_signed_number_str($str, $off);
    } elsif ($entry_header == 254) {
        ($value, $off) = &read_signed_char_str($str, $off);
    } elsif ($entry_header >= 241 and $entry_header <= 253) {
        $value = $entry_header - 241;
    } else {
        croak "Invalid entry_header $entry_header";
    }

    return ($value, $off);
}

sub read_ziplist_entry {
    my ($fh) = @_;
    my ($length, $value);

    my $prev_length = &read_unsigned_char($fh);
    if ($prev_length == 254) {
        $prev_length = &read_unsigned_int($fh);
    }

    my $entry_header = &read_unsigned_char($fh);
    if ($entry_header >> 6 == 0) {
        $length = $entry_header & 0x3f;
        read($fh, $value, $length);
    } elsif ($entry_header >> 6 == 1) {
        $length = (($entry_header & 0x3f) << 8) | &read_unsigned_char($fh);
        read($fh, $value, $length);
    } elsif ($entry_header >> 6 == 2) {
        $length = &read_big_endian_unsigned_int($fh);
        read($fh, $value, $length);
    } elsif ($entry_header >> 4 == 12) {
        $value = &read_signed_short($fh);
    } elsif ($entry_header >> 4 == 13) {
        $value = &read_signed_int($fh);
    } elsif ($entry_header >> 4 == 14) {
        $value = &read_signed_long($fh);
    } elsif ($entry_header == 240) {
        $value = &read_24bit_signed_number($fh);
    } elsif ($entry_header == 254) {
        $value = &read_signed_char($fh);
    } elsif ($entry_header >= 241 and $entry_header <= 253) {
        $value = $entry_header - 241;
    } else {
        croak "Invalid entry_header $entry_header";
    }

    return $value;
}

sub read_ziplist {
    my ($self, $fh) = @_;
    my $seek = 0;
    my ($entry, $zlbytes, $zltail, $num_entries, $value, $zlist_end);
    my $raw_string = &read_string($fh);

    ($zlbytes, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($zltail, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($num_entries, $seek) = &read_unsigned_short_str($raw_string, $seek);

    $self->invoke_callback("start_list", 
        $self->{key}, $num_entries, $self->{expiry}, 
        {'encoding' => 'ziplist', 'sizeof_value' => length($raw_string)});
    for (my $i = 0; $i < $num_entries; ++$i) {
        ($value, $seek) = &read_ziplist_entry_str($raw_string, $seek);
        $self->invoke_callback("rpush", $self->{key}, $value);
    }
    ($zlist_end, $seek) = &read_unsigned_char_str($raw_string, $seek);
    if ($zlist_end != 255) {
        croak "Invalid zip list end $zlist_end";
    }
    $self->invoke_callback("end_list", $self->{key});
}

sub read_zset_from_ziplist {
    my ($self, $fh) = @_;
    my $seek = 0;
    my ($entry, $zlbytes, $zltail, $num_entries, $member, $score, $zlist_end);
    my $raw_string = &read_string($fh);

    ($zlbytes, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($zltail, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($num_entries, $seek) = &read_unsigned_short_str($raw_string, $seek);

    if ($num_entries % 2) {
        croak "Expected even number of elements but found $num_entries";
    }

    $num_entries /= 2;

    $self->invoke_callback("start_sorted_set", 
        $self->{key}, $num_entries, $self->{expiry}, 
        {'encoding' => 'ziplist', 'sizeof_value' => length($raw_string)});
    for (my $i = 0; $i < $num_entries; ++$i) {
        ($member, $seek) = &read_ziplist_entry_str($raw_string, $seek);
        ($score, $seek) = &read_ziplist_entry_str($raw_string, $seek);
        $self->invoke_callback("zadd", $self->{key}, $score, $member);
    }
    ($zlist_end, $seek) = &read_unsigned_char_str($raw_string, $seek);
    if ($zlist_end != 255) {
        croak "Invalid zip list end $zlist_end";
    }

    $self->invoke_callback("end_sorted_set", $self->{key});
}

sub read_hash_from_ziplist {
    my ($self, $fh) = @_;
    my $seek = 0;
    my ($entry, $zlbytes, $zltail, $num_entries, $field, $value, $zlist_end);
    my $raw_string = &read_string($fh);

    ($zlbytes, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($zltail, $seek) = &read_unsigned_int_str($raw_string, $seek);
    ($num_entries, $seek) = &read_unsigned_short_str($raw_string, $seek);

    if ($num_entries % 2) {
        croak "Expected even number of elements but found $num_entries";
    }

    $num_entries /= 2;

    $self->invoke_callback("start_hash", 
        $self->{key}, $num_entries, $self->{expiry},
        {'encoding' => 'ziplist', 'sizeof_value' => length($raw_string)});

    for (my $i = 0; $i < $num_entries; ++$i) {
        ($field, $seek) = &read_ziplist_entry_str($raw_string, $seek);
        ($value, $seek) = &read_ziplist_entry_str($raw_string, $seek);
        $self->invoke_callback("hset", $self->{key}, $field, $value);
    }
    ($zlist_end, $seek) = &read_unsigned_char_str($raw_string, $seek);
    if ($zlist_end != 255) {
        croak "Invalid zip list end $zlist_end";
    }
    $self->invoke_callback("end_hash", $self->{key});
}

sub read_zipmap_next_length {
    my ($str, $off) = @_;
    my $length;
    ($length, $off) = &read_unsigned_char_str($str, $off);

    if ($length < 254) {
        return ($length, $off);
    } elsif ($length == 254) {
        return &read_unsigned_int_str($str, $off);
    } else {
        return (undef, $off);
    }
}

sub read_zipmap {
    my ($self, $fh) = @_;
    my $seek = 0;
    my ($num_entries, $next_length, $key, $free, $value);
    my $raw_string = &read_string($fh);

    ($num_entries, $seek) = &read_unsigned_char_str($raw_string, $seek);

    $self->invoke_callback("start_hash", 
        $self->{key}, $num_entries, $self->{expiry}, 
        {'encoding' => 'zipmap', 'sizeof_value' => length($raw_string)});
    while (1) {
        ($next_length, $seek) = &read_zipmap_next_length($raw_string, $seek);
        last unless defined($next_length);
        $key = substr($raw_string, $seek, $next_length);
        $seek += $next_length;

        ($next_length, $seek) = &read_zipmap_next_length($raw_string, $seek);
        unless (defined($next_length)) {
            croak "Unexpected end of zip map";
        }

        ($free, $seek) = &read_unsigned_char_str($raw_string, $seek);
        $value = substr($raw_string, $seek, $next_length);
        $seek += $next_length;

        $seek += $free;

        $self->invoke_callback("hset", $self->{key}, $key, $value);
    }

    $self->invoke_callback("end_hash", $self->{key});
}

sub skip_string {
    my ($fh) = @_;

    my $bytes_to_skip = 0;

    my ($length, $is_encoded) = &read_length_with_encoding($fh);
    if ($is_encoded) {
        if ($length == REDIS_RDB_ENC_INT8) {
            $bytes_to_skip = 1;
        } elsif ($length == REDIS_RDB_ENC_INT16) {
            $bytes_to_skip = 2;
        } elsif ($length == REDIS_RDB_ENC_INT32) {
            $bytes_to_skip = 4;
        } elsif ($length == REDIS_RDB_ENC_LZF) {
            my $clen = &read_length($fh);
            &read_length($fh);
            $bytes_to_skip = $clen;
        } else {
            croak "Never get here";
        }
    } else {
        $bytes_to_skip = $length;
    }

    &skip($fh, $bytes_to_skip);
}

sub skip_object {
    my ($fh, $enc_type) = @_;

    my $skip_strings = 0;
    if ($enc_type == REDIS_RDB_TYPE_STRING) {
        $skip_strings = 1;
    } elsif ($enc_type == REDIS_RDB_TYPE_LIST) {
        $skip_strings = &read_length($fh);
    } elsif ($enc_type == REDIS_RDB_TYPE_SET) {
        $skip_strings = &read_length($fh);
    } elsif ($enc_type == REDIS_RDB_TYPE_ZSET) {
        $skip_strings = &read_length($fh) * 2;
    } elsif ($enc_type == REDIS_RDB_TYPE_HASH) {
        $skip_strings = &read_length($fh) * 2;
    } elsif ($enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP) {
        $skip_strings = 1;
    } elsif ($enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST) {
        $skip_strings = 1;
    } elsif ($enc_type == REDIS_RDB_TYPE_SET_INTSET) {
        $skip_strings = 1;
    } elsif ($enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST) {
        $skip_strings = 1;
    } elsif ($enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST) {
        $skip_strings = 1;
    } else {
        croak "Invalid object type $enc_type\n";
    }

    for (my $i = 0; $i < $skip_strings; ++$i) {
        &skip_string($fh);
    }
}

sub skip_key_and_object {
    my ($fh, $data_type) = @_;

    &skip_string($fh);
    &skip_object($fh, $data_type);
}

sub lzf_decompress {
    my ($compressed, $expected_length) = @_;

    my $in_len = length($compressed);
    my $in_index = 0;
    my @out_bytes;
    my $out_index = 0;
    my $ref;

    while ($in_index < $in_len) {
        my $ctrl = unpack('C', substr($compressed, $in_index, 1));
        ++$in_index;

        if ($ctrl < 32) {
            for (my $i = 0; $i < $ctrl + 1; ++$i) {
                $out_bytes[$out_index] = substr($compressed, $in_index, 1);
                ++$in_index;
                ++$out_index;
            }
        } else {
            my $length = $ctrl >> 5;

            if ($length == 7) {
                $length += unpack('C', substr($compressed, $in_index, 1));
                ++$in_index;
            }

            $ref = $out_index - (($ctrl & 0x1f) << 8) -
                unpack('C', substr($compressed, $in_index, 1)) - 1;
            ++$in_index;

            for (my $i = 0; $i < $length + 2; ++$i) {
                $out_bytes[$out_index] = $out_bytes[$ref];
                ++$ref;
                ++$out_index;
            }
        }
    }

    if ($out_index != $expected_length) {
        croak "Expected lengths do not match: $out_index != $expected_length\n";
    }
    return join("", @out_bytes);
}

#==========================================
# default callbacks
#==========================================
sub def_start_rdb {
    my $filename = shift;
    print '[';
}

sub def_start_database {
    my $db_number = shift;
    print "{";
}

sub def_key {
    my $key = shift;
    ### do nothing
}

sub def_set {
    my ($key, $value, $expiry) = @_;
    print "\"$key\" : \"$value\", ";
}

sub def_start_hash {
    my ($key, $length, $expiry) = @_;
    print "\"$key\" : {";
}

sub def_hset {
    my ($key, $field, $value) = @_;
    print "\"$field\" : \"$value\", ";
}

sub def_end_hash {
    my $key = shift;
    print '}, ';
}

sub def_start_set {
    my ($key, $cardinality, $expiry) = @_;
    print "\"$key\" : [";
}

sub def_sadd {
    my ($key, $member) = @_;
    print "\"$member\", ";
}

sub def_end_set {
    my ($key) = @_;
    print "], ";
}

sub def_start_list {
    my ($key, $length, $expiry) = @_;
    print "\"$key\" : [";
}

sub def_rpush {
    my ($key, $value) = @_;
    print "\"$value\", ";
}

sub def_end_list {
    my ($key) = @_;
    print "], ";
}

sub def_start_sorted_set {
    my ($key, $length, $expiry) = @_;
    print "\"$key\" : {";
}

sub def_zadd {
    my ($key, $score, $member) = @_;
    print "\"$member\" : \"$score\", ";
}

sub def_end_sorted_set {
    my ($key) = @_;
    print "}, ";
}

sub def_end_database {
    my $db_number = shift;
    print "}, ";
}

sub def_end_rdb {
    my $filename = shift;
    print "], ";
}

1;

__END__

=head1 NAME

Redis::RdbParser - Redis rdb dump file parser

=head1 VERSION

version 0.02

=head1 SYNOPSIS

 use Redis::RdbParser;

 $parser = new Redis::RdbParser;
 # or
 my $callbacks = {
     "start_rdb"         => \&start_rdb,
     "start_database"    => \&start_database,
     "key"               => \&key,
     "set"               => \&set,
     "start_hash"        => \&start_hash,
     "hset"              => \&hset,
     "end_hash"          => \&end_hash,
     "start_set"         => \&start_set,
     "sadd"              => \&sadd,
     "end_set"           => \&end_set,
     "start_list"        => \&start_list,
     "rpush"             => \&rpush,
     "end_list"          => \&end_list,
     "start_sorted_set"  => \&start_sorted_set,
     "zadd"              => \&zadd,
     "end_sorted_set"    => \&end_sorted_set,
     "end_database"      => \&end_database,
     "end_rdb"           => \&end_rdb,
 };
 $parser = new Redis::RdbParser($callback);

 $parser->parse($filename);
 # or 
 my $filter = {
     'dbs' => [0, 1],
     'keys' => ["^foo$", "^bar"],
     'types' => ["hash", "set"],
 };
 $parser->parse($filename, $filter);

=head1 DESCRIPTION

Redis::RdbParser is a parser for Redis' rdb dump files. 
The parser generates events similar to an xml sax parser. 

=head2 callbacks

The dump file is parsed sequentially. As and when
objects are discovered, appropriate callbacks 
would be invoked. You can set the callback item `undef`
if you don't care it.

=over 4

=item start_rdb

    sub start_rdb {
        my $filename = shift;
        # fill your code
    }

Called once we know we are dealing with a valid Redis dump file.

=item start_database

    sub start_database {
        my $db_number = shift;
        # fill your code
    }

Called to indicate the start of database `db_number`.
Once a database starts, another database cannot start 
unless the first one completes and then `end_database` 
callback is called.

=item key

    sub key {
        my $key = shift;
        # fill your code
    }

Called to indicate a key readed in the parsing process.

=item set

    sub set {
        my ($key, $value, $expiry) = @_;
        # fill your code
    }

Callback to handle a key with a string value and an optional expiry.

=item start_hash

    sub start_hash {
        my ($key, $length, $expiry, $info) = @_;
        # fill your code
    }

Callback to handle the start of a hash.

After `start_hash`, the callback `hset` will be called with this `key` exactly
`length` times.

After that, the `end_hash` callback will be called.


=item hset

    sub hset {
        my ($key, $field, $value) = @_;
        # fill your code
    }

Callback to insert a field=value pair in an existing hash.

=item end_hash

    sub end_hash {
        my $key = shift;
        # fill your code
    }

Called when there are no more elements in the hash.

=item start_set

    sub start_set {
        my ($key, $length, $expiry, $info) = @_;
        # fill your code
    }

Callback to handle the start of the set.

After `start_set`, the callback `sadd` will be called with `key` 
exactly `length` times.

After that, the callback `end_set` will be called to indicate the end
of the set.

=item sadd

    sub sadd {
        my ($key, $member) = @_;
        # fill your code
    }

Callback to insert a new member to this set.

=item end_set

    sub end_set {
        my ($key) = @_;
        # fill your code
    }

Called when there are no more elements in this set.

=item start_list

    sub start_list {
        my ($key, $length, $expiry, $info) = @_;
        # fill your code
    }

Callback to handle the start of a list.

After `start_list`, the callback `rpush` will be called with `key` exactly
`length` times.

After that, the `end_list` method will be called to indicate the end of
the list.

=item rpush

    sub rpush {
        my ($key, $value) = @_;
        # fill your code
    }

Callback to insert a new value into this list.

=item end_list

    sub end_list {
        my ($key) = @_;
        # fill your code
    }

Called when there are no more elements in this list.

=item start_sorted_set

    sub start_sorted_set {
        my ($key, $length, $expiry) = @_;
        # fill your code
    }

Callback to handle the start of a sorted set.

After `start_sorted_set`, the callback `zadd' will called with `key` exactly
`length` times.

Also, `zadd` will be called in a sorted order, so as to preserve the ordering
of this sorted set.

After that, the callback `end_sorted_set` will be called to indicate the end
of this sorted set.

=item zadd

    sub zadd {
        my ($key, $score, $member) = @_;
        # fill your code
    }

Callback to insert a new value into this sorted set.

=item end_sorted_set

    sub end_sorted_set {
        my ($key) = @_;
        # fill your code
    }

Called when there are no more elements in this sorted set.

=item end_database

    sub end_database {
        my $db_number = shift;
        # fill your code
    }

Called when the current database ends

After `end_database`, one of the callbacks are called:

1) `start_database` with a new database number

2) `end_rdb` to indicate we have reached the end of the file

=item end_rdb

    sub end_rdb {
        my $filename = shift;
        # fill your code
    }

Called to indicate we have completed parsing of the dump file.

=back

=head2 filter 

filter is a reference with the following keys:

=over 4

    $filter = {
        "dbs"   => [0, 1],              # db number
        "keys"  => ["^foo$", "^bar"],   # keys regular expression
        "types" => ["string", "hash", "list", "set", "sorted set"],
    }

=back


If filter is undef, results will not be filtered.
If dbs, keys or types is undef, no filtering will be done on the axis.

=head1 REPOSITORY

https://github.com/flygoast/Redis-RdbParser

=head1 AUTHOR

fenggu, E<lt>flygoast@126.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2012 by fenggu

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.


=cut