The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Ubic::Service::ZooKeeper;

use strict;
use warnings;

# ABSTRACT: running ZooKeeper as Ubic service

=head1 SYNOPSIS

  use Ubic::Service::ZooKeeper;
  return Ubic::Service::ZooKeeper->new({
      clientPort => 2181,
      dataDir    => '/var/lib/zookeeper',
      tickTime   => 2000,
      initLimit  => 10,
      syncLimit  => 5,
      servers    => {
          1 => { server => "host1:2888:3888" },
          2 => { server => "host2:2888:3888" },
          3 => { server => "host3:2888:3888" },
      },
      myid => 1,

      ubic_log => '/var/log/zookeeper/ubic.log',
      stdout   => '/var/log/zookeeper/stdout.log',
      stderr   => '/var/log/zookeeper/stderr.log',
      user     => 'zookeeper',
      gen_cfg  => '/etc/zookeeper/conf/zoo.cfg',
      pidfile  => '/tmp/zookeeper.pid',

      java_cp => '/usr/share/java/jline.jar:/usr/share/java/log4j-1.2.jar:' .
                 '/usr/share/java/xercesImpl.jar:' .
                 '/usr/share/java/xmlParserAPIs.jar:/usr/share/java/zookeeper.jar',
});

=head1 DESCRIPTION

This module intends for running/shutdown ZooKeeper as a L<Ubic> service. It accepts all config options of ZooKeeper (and some other options), generates config and myid file and run it.


=head1 METHODS

Typically you need to use only C<new()> method, but you can find below description of some another methods also.

=over

=cut

use parent qw(Ubic::Service::Common);

use File::Copy qw(move);
use File::Spec::Functions qw(catfile);
use IO::Socket::INET;
use Params::Validate qw(:all);
use Time::HiRes qw();
use Ubic::Daemon qw(:all);
use Ubic::Result qw(:all);

=item C<new($params)>

Creates new ZooKeeper service. C<$params> is hashref with different ZooKeeper and Ubic params.

ZooKeeper config params: C<clientPort> (default is C<2181>), C<dataDir> (default is C</var/lib/zookeeper>), C<tickTime> (default is C<2000>), C<dataLogDir>, C<globalOutstandingLimit>, C<preAllocSize>, C<snapCount>, C<traceFile>, C<maxClientCnxns>, C<clientPortAddress>, C<minSessionTimeout>, C<maxSessionTimeout>, C<electionAlg>, C<initLimit>,
C<leaderServer>, C<servers>, C<groups>, C<syncLimit>, C<cnxTimeout>, C<forceSync>, C<skipACL>.

You can find description for all of this params on L<http://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html#sc_configuration>.

Two exceptions here.

The first is a C<servers> param. It combines C<server.x> and C<weight.x> params from ZooKeeper config. C<servers> is a hashref where key is a number of server and the values is a hashref with keys C<server> and C<weight>.

The second is a C<groups> param. It is a hashref, where the key is a number of group and the value is arrayref with server numbers in this group.

All of these params are optional.

Remain params are:

=over

=item I<myid> (optional)

Id of the current server in ZooKeeper cluster. Based on this value Ubic::Service::Zookeeper will create C<myid> file in C<dataDir>.

Default is C<1>.

=item I<status> (optional)

Coderef for checking ZooKeeper status. Takes current instance of C<Ubic::Service::ZooKeeper> as a first param.

Default implemetation uses C<ruok> ZooKeeper command.

=item I<user> (optional)

User name that will be used as real and effective user identifier during exec of ZooKeeper.

=item I<ubic_log> (optional)

Path to ubic log.

=item I<stdout> (optional)

Path to stdout log.

ZooKeeper supports custom logging setup, so in most cases this param is meaningless.

=item I<stderr> (optional)

Path to stderr log.

=item I<pidfile> (optional)

Pidfile for C<Ubic::Daemon> module.

If not specified it is a /tmp/zookeeper.<clientPort>.pid.

=item I<gen_cfg> (optional)

Generated ZooKeeper config file name.

If not specified it is a /tmp/zoo.<clientPort>.cfg.

=item I<java> (optional)

Path to C<java> binary. Default is just "java", so your C<PATH> should be setted properly in default case.

=item I<java_cp> (optional)

Java classpath. See ZooKeeper administration guide for more information.

It should be something like this: /usr/share/java/jline.jar:/usr/share/java/log4j-1.2.jar:/usr/share/java/xercesImpl.jar:/usr/share/java/xmlParserAPIs.jar:/usr/share/java/zookeeper.jar

=item I<jmx_enable> (optional)

Enable JMX. Default is C<1>

=item I<jmx_local_only> (optional)

Enable JMX only locally. Default is C<0>.

=item I<zoo_log_dir> (optional)

Where zookeeper will place own logs. Default is C<var/log/zookeeper>.

=item I<zoo_log4j_prop> (optional)

Log4j properties for ZooKeeper. Default is C<INFO,ROLLINGFILE>.

=item I<zoo_main_class> (optional)

Main ZooKeeper class. Default is C<org.apache.zookeeper.server.quorum.QuorumPeerMain>. Typically you don't need to redefine this param.

=item I<java_opts> (optional)

Some additional options that you want to pass to C<java>.

=back

=cut

sub new {
    my $class = shift;

    my $opt_num = { type => SCALAR, regex => qr/^\d+$/o, optional => 1 };
    my $opt_str = { type => SCALAR, optional => 1 };

    my $params = validate(@_, {
        # zookeeper config options
        ### minimum zookeeper config
        clientPort => { %$opt_num, default => 2181 },
        dataDir    => { type => SCALAR, default => '/var/lib/zookeeper' },
        tickTime   => { %$opt_num, default => 2000 },

        ### advanced zookeeper config
        dataLogDir             => $opt_str,
        globalOutstandingLimit => $opt_num,
        preAllocSize           => $opt_num,
        snapCount              => $opt_num,
        traceFile              => $opt_str,
        maxClientCnxns         => $opt_num,
        clientPortAddress      => $opt_str,
        minSessionTimeout      => $opt_num,
        maxSessionTimeout      => $opt_num,

        ### zookeeper cluster options
        electionAlg  => $opt_num,
        initLimit    => $opt_num,
        leaderServes => $opt_str,
        # num, hostname, port and weight of each server
        servers      => { type => HASHREF, default => {} },
        groups       => { type => HASHREF, default => {} },
        syncLimit    => $opt_num,
        cnxTimeout   => $opt_num,

        ### unsafe zookeeper options
        forceSync        => $opt_str,
        skipACL          => $opt_str,


        # zookeeper myid
        myid => { %$opt_num, default => 1 },


        # ubic specific options
        status        => { type => CODEREF, optional => 1 },
        user          => $opt_str,
        ubic_log      => $opt_str,
        stdout        => $opt_str,
        stderr        => $opt_str,
        pidfile       => $opt_str,
        port          => $opt_num,

        gen_cfg        => $opt_str,
        java           => { %$opt_str, default => 'java' },
        java_cp        => { %$opt_str, default => '' },
        jmx_enable     => { type => BOOLEAN, default => 1 },
        jmx_local_only => { type => BOOLEAN, default => 0 },
        zoo_log_dir    => { %$opt_str, default => '/var/log/zookeeper' },
        zoo_log4j_prop => { %$opt_str, default => 'INFO,ROLLINGFILE' },
        zoo_main_class => {
            %$opt_str,
            default => 'org.apache.zookeeper.server.quorum.QuorumPeerMain'
        },
        java_opts      => { type => SCALAR, default => '' },
    });

    if (!$params->{pidfile}) {
        $params->{pidfile} = '/tmp/zookeeper.' . $params->{clientPort} . '.pid';
    }
    if (!$params->{gen_cfg}) {
        $params->{gen_cfg} = '/tmp/zoo.' . $params->{clientPort} . '.cfg';
    }

    return bless $params => $class;
}


sub bin {
    my $self = shift;

    my $cmd = '';
    $cmd = $self->{java} . " " . $self->{java_opts} . " " .
           "-cp " . $self->{java_cp} . " ";
    if ($self->{jmx_enable}) {
        $cmd .= "-Dcom.sun.management.jmxremote ";
        unless ($self->{jmx_local_only}) {
            $cmd .= "-Dcom.sun.management.jmxremote.local.only=false ";
        }
    }
    $cmd .= "-Dzookeeper.log.dir=$self->{zoo_log_dir} ";
    $cmd .= "-Dzookeeper.root.logger=$self->{zoo_log4j_prop} ";
    $cmd .= $self->{zoo_main_class} . " ";
    $cmd .= $self->gen_cfg;

    return [ $cmd ];
}

sub _zookeeper_cfg_params_list {
    return qw/clientPort dataDir tickTime dataLogDir globalOutstandingLimit
              preAllocSize snapCount traceFile maxClientCnxns clientPortAddress
              minSessionTimeout maxSessionTimeout electionAlg initLimit
              leaderServes servers groups syncLimit cnxTimeout forceSync skipACL/;
}

=item C<create_cfg_file()>

Generates .cfg file basing on constuctor params.

=cut

sub create_cfg_file {
    my $self = shift;

    my $fname = $self->gen_cfg;
    my $tmp_fname = $fname . ".tmp";

    my %params;
    foreach (_zookeeper_cfg_params_list()) {
        $params{$_} = $self->{$_} if defined($self->{$_});
    }
    my $servers = delete $params{servers};
    my $groups = delete $params{groups};

    open(my $tmp_fh, '>', $tmp_fname) or die "Can't open file [$tmp_fname]: $!";

    foreach my $p (sort keys %params) {
        my $v = $params{$p};
        print $tmp_fh "$p=$v\n";
    }
    print $tmp_fh "\n";

    foreach my $server_num (sort {$a <=> $b} keys %$servers) {
        my $s = $servers->{$server_num};
        my $server = $s->{server};
        print $tmp_fh "server.${server_num}=$server\n";

        if ($s->{weight}) {
            print $tmp_fh "weight.${server_num}=$s->{weight}\n";
        }
    }
    print $tmp_fh "\n";

    foreach my $group_num (sort {$a <=> $b} keys %$groups) {
        my $group_servers = $groups->{$group_num};
        print $tmp_fh "group.${group_num}=" . join(":", @$group_servers) . "\n";
    }

    close($tmp_fh) or die "Can't close file [$tmp_fname]: $!";
    move($tmp_fname, $fname) or die "Can't move file [${tmp_fname}] to [$fname]: $!";
}


=item C<create_myid_file()>

Generates C<myid> file basing on C<myid> and C<dataDir> params in constructor.

=cut

sub create_myid_file {
    my $self = shift;

    my $fname = catfile($self->{dataDir}, 'myid');
    my $tmp_fname = $fname . ".tmp";

    open(my $tmp_fh, '>', $tmp_fname) or die "Can't open file [$tmp_fname]: $!";
    print $tmp_fh $self->{myid}, "\n";
    close($tmp_fh) or die "Can't close file [$tmp_fname]: $!";
    move($tmp_fname, $fname) or die "Can't move file [${tmp_fname} to [$fname]: $!";
}

sub start_impl {
    my $self = shift;

    $self->create_cfg_file;
    $self->create_myid_file;

    my $daemon_opts = { bin => $self->bin, pidfile => $self->pidfile, term_timeout => 5 };
    for (qw/ubic_log stdout stderr/) {
        $daemon_opts->{$_} = $self->{$_} if defined $self->{$_};
    }
    start_daemon($daemon_opts);

    return;
}

sub stop_impl {
    my $self = shift;

    return stop_daemon($self->pidfile, { timeout => 7 });
}

sub status_impl {
    my $self = shift;

    my $running = check_daemon($self->pidfile);
    return result('not running') unless ($running);

    if ($self->{status}) {
        return $self->{status}->($self);
    }

    my $sock = IO::Socket::INET->new(
        PeerAddr => "localhost",
        PeerPort => $self->port,
        Proto    => "tcp",
        Timeout  => 1,
        Blocking => 0,
    );
    return result('broken') unless ($sock);

    $sock->print('ruok');
    my $resp = '';
    for (1..10) {
        my $buff;
        $sock->sysread($buff, 4);
        $resp .= $buff if defined($buff);
        last if (length($resp) >= 4);
        Time::HiRes::sleep(0.1);
    }

    if ($resp eq 'imok') {
        return result('running');
    } else {
        return result('broken');
    }
}

sub user {
    my $self = shift;

    return $self->{user} if defined $self->{user};
    return $self->SUPER::user;
}

=item C<pidfile()>

Get pidfile name.

=cut

sub pidfile {
    my $self = shift;

    return $self->{pidfile};
}

sub gen_cfg {
    my $self = shift;

    return $self->{gen_cfg};
}

sub port {
    my $self = shift;

    return $self->{port} if defined $self->{port};
    return $self->{clientPort};
}

sub timeout_options {
    return {
        start => { trials => 15, step => 0.1 },
        stop  => { trials => 15, step => 0.1 }
    };
}

=back

=head1 SEE ALSO

L<http://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html>

L<Ubic>

=cut

1;