The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env perl
package Queue::Q::ReliableFIFO::CLI;
use strict;
use Redis;
use Term::ReadKey;
use Data::Dumper;
use JSON::XS;
use File::Slurp;
use Carp qw(croak);

use Queue::Q::ReliableFIFO::Redis;
use Class::XSAccessor {
    constructor => 'new',
    getters => [qw(conn prompt path redis server port db)],
    setters => { 
        set_conn => 'conn',
        set_prompt => 'prompt',
        set_path => 'path',
        set_redis => 'redis',
        set_server => 'server',
        set_port => 'port',
        set_db => 'db',
    },
};

# The path property has the following meaning:
#
# $path== undef -> not connected
# @$path==0 -> /
# @$path==1 -> /queue
# @$path==2 -> /queue/type

my @all_types = (qw(main busy failed ));
my %type = map { $_ => undef } @all_types;

sub open {
    my ($self, %params) = @_;
    $params{server} || die "missing server\n";
    $params{port}   ||= 6379;
    $params{db}     ||= 0;
    $self->set_conn( Redis->new(
        reconnect => 60,
        server => $params{server} . ':' . $params{port}));
    $self->conn->select($params{db})
        if $self->conn && exists $params{db} && $params{db};
    $self->set_server($params{server});
    $self->set_port($params{port});
    $self->set_db($params{db});
    $self->set_path($self->conn ? [] : undef);
}
sub close {
    my $self = shift;
    $self->set_conn(undef);
    $self->set_path(undef);
}
sub newpath {
    my ($self, $arg) = @_;
    my @args = defined $arg ? split(/\//, $arg) : ();
    if (! defined $self->path) {
        croak("not connected");
    }
    my @path;
    if (substr($arg,0,1) eq '/') {
        @path = @args;
        shift @path;
    }
    else {
        @path = @{$self->path};
        while (my $v = shift @args) {
            if ($v eq '..') {
                pop @path;
            }
            elsif ($v ne '.') {
                push @path, $v;
            }
        }
    }
    return @path;
}
sub ls {
    my $self = shift;
    my $arg  = shift;
    my @path = $self->newpath($arg);
    my $level= @path;
    my ($queue, $type) = @path;

    if ($level == 0) {
        my %q = map { $_ => undef } 
                map { s/_(?:main|busy|failed|time)$//; $_ } 
                grep{ /_/ }
                map { $self->conn->keys('*' . '_' . $_) }
                @all_types;
        my @q = sort keys %q;
        return @q;
    }
    elsif ($level == 1) {
        return 
            map {  sprintf "%7s: %d items", 
                        $_, int($self->conn->llen($queue . '_' . $_)) 
            }
            @all_types;
    }
    elsif ($level == 2) {
        # show 10 items at the consumer side of the queue
        my $start = -10;
        my $n = 9;
        return reverse $self->conn->lrange(
            $queue . "_$type", $start, $start + $n);
    }
    else {
        print Dumper(\@path);
        croak("never thought of this...");
    }
}
sub cd {
    my $self = shift;
    my $arg  = shift;
    my @path = $self->newpath($arg);
    my @oldpath = @{$self->path};

    if (defined $path[0] && $path[0] ne $oldpath[0]) {
        $self->set_redis(Queue::Q::ReliableFIFO::Redis->new(
            redis_conn => $self->conn,
            server      => $self->server,
            port        => $self->port,
            queue_name  => $path[0]));
    }
    $self->set_path(\@path);
}
sub change_db {
    my ($self, $db) = @_;
    $db =  int($db);
    my $ret = $self->conn->select($db);
    if ($ret) {
        $self->set_db($db);
        $self->set_path([]);
    }
}
sub mv {
    my ($self, $from, $to, $limit) = @_;
    my @from = $self->newpath($from);
    my @to   = $self->newpath($to);
    die "$from[1]? expected main|busy|time|failed" if !exists $type{$from[1]};
    die "$to[1]? expected main|busy|time|failed" if !exists $type{$to[1]};
    die "Path length should be 2: " . join('/',@from) if @from != 2;
    die "Path length should be 2: " . join('/',@to) if @to != 2;
    die "from and to are same" if join('',@from) eq join('',@to);

    my $conn = $self->conn;
    my $count = 0;
    my $redis_from = join('_', @from);
    my $redis_to = join('_', @to);
    while ($conn->rpoplpush($redis_from, $redis_to)) {
        $count++;
        last if ($limit && $count >= $limit);
    }
    return $count;
}
sub rm {
    my ($self, $dir, $limit) = @_;
    my @dir = $self->newpath($dir);
    die "not a complete path" if (@dir != 2);
    my $redisname = join '_', @dir;
    my $conn = $self->conn;
    my $count = 0;
    if ($limit) {
        while ($conn->rpop($redisname)) {
            $count++;
            last if ($count >= $limit);
        }
    }
    else {
        $self->conn->multi;
        $self->conn->llen($redisname);
        $self->conn->del($redisname);
        ($count) = $self->conn->exec;
    }
    return $count;
}
sub who {
    my $self = shift;
    return $self->conn->client_list();
}
sub cleanup {
    my ($self, $timeout, $action) = @_;
    die "not connected" if !defined $self->path;
    die "command not available here" if @{$self->path} < 1;
    my @items = $self->{redis}->handle_expired_items($timeout, $action);
    return scalar @items;
}
sub show_prompt {
    my $self = shift;
    my $s = $self->server;
    my $p = $self->port;
    my $d = $self->db;
    my $path = $self->path;
    my $pathstr = defined $path ? '/' . join('/', @$path) : '';
    my $prompt;
    if (! defined $path) { $prompt = ''; }
    else { $prompt = "$s:$p (db=$d) \[ $pathstr \]" }

    return 'FIFO:', $prompt, "-> ";
}

sub run {
    my $self = shift;
    $| = 1;

    my @history;
    my $his_pos = 0;

    # take settings from previous session
    my $conf_file = "$ENV{HOME}/.reliablefifo";
    my %conf = ();
    if (-f $conf_file) {
        %conf = %{decode_json(read_file($conf_file))};
        $self->open(server => $conf{server},  port => $conf{port})
            if exists $conf{server} && exists $conf{port};
        push(@history, @{$conf{history}}) if exists $conf{history};
        my ($queue, $type) = @{$conf{path}||[]};
        $self->cd($queue) if $queue;
        $self->cd($type) if $type;
    }
    my $quit = sub {
        ReadMode 0;

        # save setting for next session
        my %conf = ();
        if (defined $self->path) {
            $conf{server} = $self->server;
            $conf{port} = $self->port;
            $conf{db} = $self->db;
            $conf{path} = $self->path;
        }
        splice(@history, 0, $#history-20) if $#history > 20; # limit history
        pop @history;   # remove empty item
        $conf{history} = \@history;
        write_file($conf_file, encode_json(\%conf));

        exit 0;
    };

    my %commands = map { $_ => undef } (qw(
        open
        ls
        cd
        close
        db
        mv
        rm
        who
        quit
        exit
        hist
        cleanup
        ?
    ));
    my %help = (
        0 => ["open <server> [<port> [<db>]]"],
        1 => [
                "ls [<path>]",
                "cd <path>",
                "mv <path-from> <path-to> [<limit>]",
                "cleanup <timeout> <(requeue|fail|drop)>", 
                'rm <path> [<limit>]',
                'db <db>',
                'close',
             ],
    );
    push(@{$help{$_}}, ("?", "who", "hist", "quit")) for (0 .. 3);

    ReadMode 4;
    print "Type '?' for help\n";
    print $self->show_prompt;

    while(1) {
        my $line;
        push @history, '';
        $his_pos = $#history;

        # deal with the keyboard
        while (1) {
            my $c = ReadKey(1);
            next if ! defined $c;
            my $ord = ord $c;

            if ($ord == 3 || $ord == 4) {   # ctrl-c, ctrl-d
                print "\n";
                $quit->();
            }
            elsif ($ord == 127) {
                print "\r" , $self->show_prompt , ' ' x length($line);
                $line = substr($line, 0, length($line)-1);
                print "\r", $self->show_prompt, $line;
            }
            elsif ($ord == 27) {
                my $a = ord(ReadKey (1));
                if ($a == 91) {
                    my $b = ord(ReadKey (1));
                    if ($b == 65 || $b == 66) {
                        # arrow keys up/down
                        print "\r" , $self->show_prompt , ' ' x length($line);
                        if ($b == 65) {  # up
                            $history[$his_pos] = $line if $his_pos == $#history;
                            $his_pos-- if $his_pos > 0;
                        }
                        else {
                            $his_pos++ if $his_pos < $#history;;
                        }
                        $line = $history[$his_pos];
                        print "\r" ,  $self->show_prompt , $line;
                    }
                } # else ignore
            }
            elsif ($ord == 10) { #LF
                $his_pos = $#history;
                $history[$his_pos] = $line;
                print $c;
                last;
            }
            else {
                $line .= $c;
                print $c;
            }
        }

        # deal with the command
        my ($cmd, @args) = split /\s+/, $line;
        if ($cmd) {
            if (exists $commands{$cmd}) {
                eval {
                    if ($cmd eq "open") {
                        $self->open(server => $args[0],
                                   port => $args[1],
                                   db => $args[2]);
                    }
                    elsif ($cmd eq "cd") {
                        $self->cd(@args);
                    }
                    elsif ($cmd eq "db") {
                        $self->change_db(@args);
                    }
                    elsif ($cmd eq "rm") {
                        printf "%d items removed\n", $self->rm(@args);
                    }
                    elsif ($cmd eq "mv") {
                        printf "%d items moved\n", $self->mv(@args);
                    }
                    elsif ($cmd eq 'ls') {
                        print join("\n", $self->ls(@args)), "\n";
                    }
                    elsif ($cmd eq 'who') {
                        print join("\n", $self->who(@args)), "\n";
                    }
                    elsif ($cmd eq 'cleanup') {
                        my $n = $self->cleanup(@args);
                        printf "%d items affected\n", $n;
                        print "Try again after <timeout> seconds\n" if ($n ==0);
                    }
                    elsif ($cmd eq 'close') {
                        $self->close(), "\n";
                    }
                    elsif ($cmd eq 'quit' || $cmd eq 'exit') {
                        $quit->();
                    }
                    elsif ($cmd eq 'hist') {
                        print "\t", join("\n\t", @history), "\n";
                    }
                    elsif ($cmd eq '?') {
                        my $connected = defined $self->path ? 1 : 0;
                        print "available commands at this level:\n\t",
                            join("\n\t", @{$help{$connected}}), "\n";
                    }
                    1;
                }
                or do {
                    print "$@\n";
                }
            }
            else {
                print "unknown command $cmd\n";
            }
        }
        print $self->show_prompt;
    }
    $quit->();
}

1;

__END__

=head1 NAME

Queue::Q::ReliableFIFO::CLI - Command line interface to queues in Redis

=head1 DESCRIPTION

A command line interface can be started by

    perl -MQueue::Q::ReliableFIFO::CLI \
        -e 'Queue::Q::ReliableFIFO::CLI->new->run()'

You can put that in a file e.g. called 'fifo-cli' and put that file
in your PATH.

The last state of the cli session is saved in $ENV{HOME}/.reliablefifo
(as JSON). When restarting the cli, the previous context will be
reloaded, i.e. connection, directory and history

The command a in unix style, like "ls", "cd", "rm" and "mv". Press "?"
to get a list of available commands. A directory structure is emulated
in which the highest level is the queue name and the second level is
a type of queue, i.e. queue with busy items, queue with failed items and
main queue with waiting items.

The "mv" and "rm" resp. move and remove items. The commands accept an
extra parameter to limit the number of items they handle.

Example:

    [herald@aap redis]$ fifo-cli
    Type '?' for help
    FIFO:-> open localhost
    FIFO:localhost:6379 (db=0) [ / ]-> ls
    mytest
    FIFO:localhost:6379 (db=0) [ / ]-> cd mytest
    FIFO:localhost:6379 (db=0) [ /mytest ]-> ls
       main: 4054 items
       busy: 0 items
     failed: 2 items
    FIFO:localhost:6379 (db=0) [ /mytest ]-> ls failed
    {"b":{"k":9,"x":783348.75271916,"s":"bgky"},"t":1352319296}
    {"b":{"k":10,"x":574480.695396417,"s":"irzd"},"t":1352319296}
    FIFO:localhost:6379 (db=0) [ /mytest ]-> mv failed main  
    2 items moved
    FIFO:localhost:6379 (db=0) [ /mytest ]-> ls
       main: 4056 items
       busy: 0 items
     failed: 0 items
    FIFO:localhost:6379 (db=0) [ /mytest ]-> cd main
    FIFO:localhost:6379 (db=0) [ /mytest/main ]-> ls
    {"b":{"k":11,"x":664047.321063155,"s":"ptkx"},"t":1352319296}
    {"b":{"k":12,"x":502226.272384469,"s":"yrqb"},"t":1352319296}
    ...
    FIFO:localhost:6379 (db=0) [ /mytest/main ]-> mv . ../failed 1
    1 items moved
    FIFO:localhost:6379 (db=0) [ /mytest/main ]-> cd ..
    FIFO:localhost:6379 (db=0) [ /mytest ]-> ls
       main: 4055 items
       busy: 0 items
     failed: 1 items


=head1 AUTHOR

Herald van der Breggen, E<lt>herald.vanderbreggen@booking.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2012 by Herald van der Breggen

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.1 or,
at your option, any later version of Perl 5 you may have available.

=cut