#!/usr/bin/perl
#
# snaked: cool cron replacement.
#
#
# petya@kohts.ru
#
#
BEGIN {
# unbuffered output
$| = 1;
use Cwd;
use FindBin;
# without this chroot abs_path (below)
# returns empty string if daemon is called
# from some shell script and current directory
# is a home directory of a user (permissions?)
chroot('/');
$ENV{'MY_BIN'} = "$FindBin::Bin";
$ENV{'MY_LIB'} = Cwd::abs_path("$ENV{'MY_BIN'}/../lib");
if (! -d $ENV{'MY_LIB'}) {
$ENV{'MY_LIB'} = Cwd::abs_path("$ENV{'MY_BIN'}/lib");
}
$ENV{'MY_ETC'} = "/etc/snaked";
$ENV{'MY_ROOT'} = Cwd::abs_path("$ENV{'MY_BIN'}/../../../..");
if (!$ENV{'PS_SNAKED_LIB'}) {
$ENV{'PS_SNAKED_LIB'} = $ENV{'MY_LIB'};
}
};
use strict;
use warnings;
use lib "$ENV{'MY_LIB'}";
use lib "$ENV{'PS_SNAKED_LIB'}";
use snaked;
use Yandex::Tools;
use Yandex::Tools::ProcessList;
package snaked::Daemon;
use Schedule::Cron::Events;
use Time::Local;
use POSIX;
use IO::Handle; # autoflush
use Socket; # socketpair
use Fcntl;
use Time::HiRes;
use File::Path;
my $version = $snaked::VERSION;
my ($daemon_regexp_configured, $daemon_match_cfg, $daemon_match_cfg1, $daemon_match_nocfg, $watchdog_match, $watchdog_match1);
my $current_user = getpwuid($<);
my $current_host = `hostname -f` || "";
my $from_address;
if ($current_user && $current_host) {
$current_host =~ s/[\r\n]//go;
$from_address = $current_user . '@' . $current_host;
}
my $my_path;
my $my_command_line;
my $watchdogs2maintain = 1;
my $use_usleep;
my $have_sleep_external;
if ($^O eq 'linux') {
$use_usleep = 1;
}
else {
my $ret = system("sleep 0");
if ($ret eq 0) {
$have_sleep_external = 1;
}
else {
write_spool("external_sleep_error", "no external sleep: code [$?]; err: [$!]\n", {'mode' => "append"});
}
}
sub clock_adjusted {
my ($clock, $opts) = @_;
$opts->{'start'} = $snaked::Daemon::runtime->{'start_time'} unless $opts->{'start'};
$opts->{'threshold_left'} = -2 unless $opts->{'threshold_left'};
$opts->{'threshold_right'} = 2 unless $opts->{'threshold_right'};
my $elapsed_time_real = $clock->{'real'} - $opts->{'start'}->{'real'};
my $elapsed_time_mono = $clock->{'mono'} - $opts->{'start'}->{'mono'};
my $real_mono_diff = $elapsed_time_mono - $elapsed_time_real;
my $clock_out = localtime($clock->{'real'});
# were there any adjustments (respecting threshold)
if ($real_mono_diff < $opts->{'threshold_left'} ||
$real_mono_diff > $opts->{'threshold_right'} ) {
if ($opts->{'return_adjustment'}) {
return $real_mono_diff;
}
$clock_out .= " (adj. $real_mono_diff: " . localtime($clock->{'real'} + $real_mono_diff) . ")";
}
if ($opts->{'return_adjustment'}) {
return 0;
}
return $clock_out;
}
sub my_usleep {
my ($usec) = @_;
$usec = 1 unless $usec;
if ($usec > 60_000_000) {
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
write_spool("my_usleep_master", "[$$] my_usleep got $usec to sleep\n", {'mode' => "append"});
}
else {
write_spool("my_usleep_watchdog", "[$$] my_usleep got $usec to sleep\n", {'mode' => "append"});
}
}
my $before = snaked::my_clock();
if ($use_usleep) {
# Time::HiRes::setitimer($Time::HiRes::ITIMER_REAL, $usec / 1_000_000);
# Time::HiRes::nanosleep($usec * 1000);
Time::HiRes::usleep($usec);
}
else {
my $sec = int(($usec / 1_000_000) + 0.5);
if (!$sec) {
$sec = 1;
}
if ($have_sleep_external) {
my $ret = system("sleep $sec");
# http://perldoc.perl.org/functions/system.html
#
# Since SIGINT and SIGQUIT are ignored during the execution of system,
# if you expect your program to terminate on receipt of these signals
# you will need to arrange to do so yourself based on the return value.
#
if ($ret == -1) {
write_spool("external_sleep_error", "code [$?]; err: [$!]", {'mode' => "append"});
}
elsif ($ret & 127) {
my $sig = $ret & 127;
# forward SIGINT and SIGQUIT to the main process
#
if ($sig eq 2 || $sig eq 3) {
sigTERM_handler();
}
else {
write_spool("external_sleep_error", "child died with signal $sig", {'mode' => "append"});
}
}
else {
my $exit_value = $ret >> 8;
if ($exit_value ne 0) {
write_spool("external_sleep_error", "child exited with value $exit_value", {'mode' => "append"});
}
}
}
else {
sleep $sec;
}
$usec = $sec * 1_000_000;
}
my $after = snaked::my_clock();
# log if slept X times longer than requested
if (($after->{'mono'} - $before->{'mono'}) > (($usec / 1_000_000) * 20)) {
my $spool_filename;
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
$spool_filename = "my_usleep_master";
}
else {
$spool_filename = "my_usleep_watchdog";
}
write_spool($spool_filename, "[$$] my_usleep slept from $before->{'mono'} ($before->{'real'}) " .
"to $after->{'mono'} ($after->{'real'}); requested [$usec]\n", {'mode' => "append"});
}
return $usec;
}
sub watchdog_check_timeout {
my ($opts) = @_;
my $average_number_of_processes = 50 * 2; # 50 is average
$opts = {} unless $opts;
$opts->{'watchdogs2maintain'} = 0
unless $watchdogs2maintain;
$opts->{'number_of_processes'} = $average_number_of_processes
unless $opts->{'number_of_processes'};
my $timeout_step = ($opts->{'watchdogs2maintain'} + 1) * 2 * 2000000; # 8 secs
my $k =
int($opts->{'number_of_processes'} / $average_number_of_processes) *
2 *
$opts->{'watchdogs2maintain'} *
1000000; # microseconds
return $timeout_step + $k;
}
sub sigTERM_handler {
# Yandex::Tools::do_log("snaked $$ term: " . $snaked::Daemon::runtime->{'type'});
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
$snaked::Daemon::runtime->{'flags'}->{'stop'} = 1;
}
elsif ($snaked::Daemon::runtime->{'type'} eq 'watchdog') {
exit;
}
}
sub sigHUP_handler {
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
$snaked::Daemon::runtime->{'flags'}->{'refresh_configuration'} = 1;
}
}
sub sigUSR1_handler {
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
$snaked::Daemon::runtime->{'flags'}->{'detailed_status'} = 1;
}
}
sub sigUSR2_handler {
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
# do not restart if alreadying being stopped
if (!$snaked::Daemon::runtime->{'flags'}->{'stop'}) {
$snaked::Daemon::runtime->{'flags'}->{'restart'} = 1;
}
}
}
sub sigALRM_handler {
# empty SIGALRM handler so we don't croak
# when getting setitimer signals but rather
# wakeup from (potentially) indefinite nanosleep
#
}
#Yandex::Tools::disable_all_signals();
$SIG{'TERM'} = \&sigTERM_handler;
$SIG{'HUP'} = \&sigHUP_handler;
$SIG{'USR1'} = \&sigUSR1_handler;
$SIG{'USR2'} = \&sigUSR2_handler;
$SIG{'ALRM'} = \&sigALRM_handler;
$SIG{'INT'} = \&sigTERM_handler; # ctrl-c
sub write_spool {
my ($filename, $value, $opts) = @_;
$opts = {} unless $opts;
$opts->{'mode'} = "overwrite" unless $opts->{'mode'};
my $spool_dir = config_value('spool_directory');
# if spool directory is not available
# continue silently
return {} unless $spool_dir;
my $spool_file = $spool_dir . "/" . $filename;
$value = "" unless $value;
my $fh = Yandex::Tools::safe_open($spool_file, $opts->{'mode'}, {'timeout' => 0});
if ($fh) {
print $fh $value;
Yandex::Tools::safe_close($fh);
return {
'ok' => 1,
'full_pathname' => $spool_file,
};
}
else {
return {
'errtext' => "unable to write to [$spool_file]",
};
}
}
sub debug_main_cycle {
my ($label, $msg, $opts) = @_;
return unless $label;
my $debug_top_dir;
if ($snaked::Daemon::runtime->{'type'} eq 'master') {
$debug_top_dir = config_value('debug_main_cycle');
}
else {
$debug_top_dir = config_value('debug_watchdog');
}
return unless $debug_top_dir;
$msg = "" unless $msg;
$msg = clock_adjusted(snaked::my_clock()) . ": " . $msg;
# trying to block as less as possible
#
Yandex::Tools::write_file_option($debug_top_dir . "/" . $$ . "_" . $label, $msg, {'timeout' => 1});
}
sub help() {
print '
snaked -- cron as it should be (version ' . $version . ')
command-line options:
start-up type:
--daemon -- run in background
--debug -- run in foreground with debug output
runtime control:
--restart [--wait] -- schedule restart for currently running daemon
(valid only for backgrounded daemon)
--configure -- schedule reread of configuration
--status -- is there daemon running?
--stop [--wait] -- schedule stop for currently running daemon
--detailed-status -- save detailed status into spool directory
configuration:
--add-job JOB --param value -- configure new job
--modify-job JOB --param value -- modify job parameters
--delete-jobs JOBS -- delete listed jobs
--disable-jobs [JOBS] -- disable all (or listed) jobs
--enable-jobs [JOBS] -- enable all (or listed) jobs
--show-config -- show configured daemon jobs
--show-job JOB [--param] -- shows either all parameters of the job
or those specified by the --param
(space separated)
--version -- show daemon version
';
exit 0;
}
sub config_value {
my ($option_name) = @_;
my $config = $snaked::Daemon::runtime->{'config'};
if ($config->{$option_name}) {
return $config->{$option_name}->{'value'};
}
return undef;
}
sub do_err_log {
my ($msg) = @_;
my $config = $snaked::Daemon::runtime->{'config'};
if (config_value('log_errors')) {
my $tmp_log = Yandex::Tools::get_log_filename();
Yandex::Tools::set_log_filename(config_value('log_errors'));
# just in case it fails
my $res = eval {
Yandex::Tools::do_log($msg);
};
Yandex::Tools::set_log_filename($tmp_log);
}
}
sub run_task {
my ($task_name, $parent_control_socket) = @_;
my $config = $snaked::Daemon::runtime->{'config'};
my $task = $snaked::Daemon::runtime->{'tasks'}->{$task_name};
my $old_job_name;
$old_job_name = $ENV{'JOB_NAME'} if defined($ENV{'JOB_NAME'});
$ENV{'JOB_NAME'} = $task_name;
my $start_time = snaked::my_clock();
# do not forget to modify remove_child which reads this message
#
print $parent_control_socket "started at mono [$start_time->{'mono'}] real [$start_time->{'real'}]\n";
Yandex::Tools::debug("running task [$task_name] timeout [$task->{'execution_timeout'}] kill timeout [$task->{'kill_timeout'}]");
my $o = Yandex::Tools::run_forked($task->{'cmd'}, {
'timeout' => $task->{'execution_timeout'},
'terminate_on_parent_sudden_death' => 1,
'terminate_on_signal' => 'TERM',
'terminate_wait_time' => $task->{'kill_timeout'},
'clean_up_children' => 1,
});
Yandex::Tools::debug("finished [$task_name]: " . Yandex::Tools::safe_string($o->{'exit_code'}));
$ENV{'JOB_NAME'} = $old_job_name if $old_job_name;
if ($o->{'parent_died'}) {
do_err_log("[$$] my parent died, exiting");
Yandex::Tools::do_log("[$$] my parent died, exiting");
exit 1;
}
elsif ($o->{'err_msg'}) {
if (! defined($task->{'disable_notifications'})) {
# save first failure time (this is only valid during
# child life, parent will set this again in its
# memory space after child returns $o->{'err_msg'})
# (!) reading this exactly time from the child_control_socket
#
$task->{'runtime'}->{'first_failure_time'} = $start_time->{'mono'}
unless $task->{'runtime'}->{'first_failure_time'};
my $end_time = snaked::my_clock();
my $task_info =
"\n\n" .
"task summary\n" .
"------------\n" .
"pid: $$\n" .
"cmdline: $task->{'cmd'}\n" .
"start time: " . clock_adjusted($start_time) . "\n" .
"end time: " . clock_adjusted($end_time) . "\n"
;
# do not notify more often than once
# each $task->{'notification_interval'} seconds
# (notify after each failure if not defined)
if ($task->{'notification_interval'}) {
if ($task->{'runtime'}->{'first_failure_time'} + $task->{'notification_interval'} < $start_time->{'mono'}) {
Yandex::Tools::send_mail({
'from' => $from_address,
'to' => ($task->{'admin_email'} ? $task->{'admin_email'} : $config->{'admin_email'}->{'value'}),
'subject' => $0 . ": $task_name warning",
'body' => $o->{'err_msg'} . $task_info,
'no_cc_all' => 1,
});
# pretend that everything went fine
# (this will make parent reset first_failure_time)
#
# old logic, when no child_control_socket was available,
# could be rewritten to use it (!)
#
return "";
}
}
else {
Yandex::Tools::send_mail({
'from' => $from_address,
'to' => ($task->{'admin_email'} ? $task->{'admin_email'} : $config->{'admin_email'}->{'value'}),
'subject' => $0 . ": $task_name warning",
'body' => $o->{'err_msg'} . $task_info,
'no_cc_all' => 1,
});
# if we have no notification interval
# do not mask errors from parent --
# so it can log them (!)
#
}
}
# if notification was not sent --
# let parent know that we had problem
# and it should set first_failure_time
# (if not set)
return $o->{'err_msg'};
}
return "";
}
sub add_child {
my ($task_name, $opts) = @_;
$opts = {} unless $opts;
Yandex::Tools::die ("Programmer error: add_child expects at least child name")
unless $task_name;
my $child_socket;
my $parent_socket;
my $child_control_socket;
my $parent_control_socket;
socketpair($child_socket, $parent_socket, AF_UNIX, SOCK_STREAM, PF_UNSPEC) ||
Yandex::Tools::die ("socketpair: $!");
socketpair($child_control_socket, $parent_control_socket, AF_UNIX, SOCK_STREAM, PF_UNSPEC) ||
main::die ("socketpair: $!");
$child_socket->autoflush(1);
$parent_socket->autoflush(1);
$child_control_socket->autoflush(1);
$parent_control_socket->autoflush(1);
my $pid;
if ($pid = fork) {
# we are a parent
close $parent_socket;
close $parent_control_socket;
my $flags = 0;
fcntl($child_socket, F_GETFL, $flags) || die "can't fnctl F_GETFL: $!";
$flags |= O_NONBLOCK;
fcntl($child_socket, F_SETFL, $flags) || die "can't fnctl F_SETFL: $!";
$flags = 0;
fcntl($child_control_socket, F_GETFL, $flags) || die "can't fnctl F_GETFL: $!";
$flags |= O_NONBLOCK;
fcntl($child_control_socket, F_SETFL, $flags) || die "can't fnctl F_SETFL: $!";
my $child = {
'pid' => $pid,
'name' => $task_name,
'borntime' => snaked::clock_mono(),
'killtime' => 0,
'child_socket' => $child_socket,
'child_control_socket' => $child_control_socket,
'output' => '',
'control_output' => '',
};
$snaked::Daemon::runtime->{'children'}->{'by_pid'}->{$pid} = $child;
$snaked::Daemon::runtime->{'children'}->{'by_name'}->{$task_name}->{$pid} = $child;
child_started($task_name);
}
else {
# this is the parent code which is unable to fork (!);
# logging the state, waiting for a while and retrying
#
if (! defined($pid)) {
Yandex::Tools::do_log("cannot fork: $!");
$snaked::Daemon::runtime->{'usec_2wait_before_fork'} = 30_000_000;
return;
}
# now finally here is the child code
#
$snaked::Daemon::runtime->{'type'} = "child";
close $child_socket;
close $child_control_socket;
my $r = run_task($task_name, $parent_control_socket);
# output only first 1024 lines
my $lines = 0;
OUTPUT_BY_LINE: while ($r =~ /([^\r\n]+?)([\r\n]|$)/sg) {
my $s = $1;
my $e = $2;
print $parent_socket "$s\n";
$lines++;
if ($lines > 1024) {
print $parent_socket "more than 1024 lines, output truncated\n";
last OUTPUT_BY_LINE;
}
}
close($parent_socket);
close($parent_control_socket);
exit 0;
}
}
sub find_child {
my ($name) = @_;
if (!$snaked::Daemon::runtime->{'children'}->{'by_name'}->{$name}) {
return undef;
}
# future use: several children for one task (not implemented now)
return scalar(keys %{$snaked::Daemon::runtime->{'children'}->{'by_name'}->{$name}});
}
sub child_started {
my ($name) = @_;
my $task = $snaked::Daemon::runtime->{'tasks'}->{$name};
$task->{'last_start'} = snaked::my_clock();
if ($task->{'cron'}) {
$task->{'next_run'} = Time::Local::timelocal($task->{'cron'}->nextEvent);
}
}
sub child_finished {
my ($name, $output) = @_;
my $task = $snaked::Daemon::runtime->{'tasks'}->{$name};
$task->{'last_finish'} = snaked::my_clock();
if ($output) {
do_err_log("[$name]: $output");
Yandex::Tools::do_log("[$name]: $output");
}
}
# reads output from child if any
# (so it can't overflow IPC buffer)
#
sub manage_child {
my ($pid) = @_;
my $child = $snaked::Daemon::runtime->{'children'}->{'by_pid'}->{$pid};
my $child_socket = $child->{'child_socket'};
my $child_output = "";
while (my $l = <$child_socket>) {
$child_output .= $l;
}
$child->{'output'} .= $child_output;
my $child_control_socket = $child->{'child_control_socket'};
my $child_control_output = "";
while (my $l = <$child_control_socket>) {
$child_control_output .= $l;
}
$child->{'control_output'} .= $child_control_output;
}
sub remove_child {
my ($pid) = @_;
Yandex::Tools::die("Programmer error: remove_child called on child which hasn't finished yet")
if waitpid($pid,WNOHANG) ne -1;
my $child = $snaked::Daemon::runtime->{'children'}->{'by_pid'}->{$pid};
my $task = $snaked::Daemon::runtime->{'tasks'}->{$child->{'name'}};
close($child->{'child_socket'});
close($child->{'child_control_socket'});
# exact start time of the child
# as it was noticed in the child code
# (read from child_control_socket)
#
my $real_child_start_time;
# process child control messages
#
if ($child->{'control_output'}) {
if ($child->{'control_output'} =~ /started at mono \[(.*)\] real \[(.*)\]/so) {
$real_child_start_time = $1;
}
}
# if child has output -- then it had some situation
# which requires user invervention; save failure time
#
# (unset when child returns nothing --
# meaning intervention is no longer needed)
#
if ($child->{'output'}) {
# just in case, when child was brutally killed
if (!$real_child_start_time) {
$real_child_start_time = snaked::clock_mono();
}
# set first_failure_time (duplicates child code
# to be more understandable)
#
$task->{'runtime'}->{'first_failure_time'} = $real_child_start_time
unless $task->{'runtime'}->{'first_failure_time'};
Yandex::Tools::debug("child output: " . $child->{'output'});
if (! defined($task->{'disable_notifications'})) {
# reset failure interval counter,
# so we do not send notifications
# more ofthen than notification_interval
#
# notifications should have already been sent from child
#
if ($task->{'runtime'}->{'first_failure_time'} + $task->{'notification_interval'} < $real_child_start_time) {
$task->{'runtime'}->{'first_failure_time'} = $real_child_start_time;
}
}
}
else {
# delete first error time so next failure time will be saved
delete($task->{'runtime'}->{'first_failure_time'});
}
child_finished($child->{'name'}, $child->{'output'});
delete $snaked::Daemon::runtime->{'children'}->{'by_name'}->{$child->{'name'}}->{$pid};
delete $snaked::Daemon::runtime->{'children'}->{'by_pid'}->{$pid};
}
sub have_children {
my $have_children = 0;
foreach my $k (keys %{$snaked::Daemon::runtime->{'children'}->{'by_pid'}}) {
$have_children = 1;
last;
}
return $have_children;
}
sub for_each_child {
my ($opts) = @_;
$opts = {} unless $opts;
foreach my $k (keys %{$snaked::Daemon::runtime->{'children'}->{'by_pid'}}) {
if ($opts->{'stop_now'}) {
# Yandex::Tools::do_log("killing $k");
kill(15, $k); # TERM (default for run_forked)
}
}
}
sub reschedule_cron_tasks {
# all configured tasks (by name)
my $tasks = $snaked::Daemon::runtime->{'tasks'};
# find cron tasks and recalculate next_run time
foreach my $task_name (keys %{$tasks}) {
my $task = $tasks->{$task_name};
# totally skip disabled tasks
next if defined($task->{'disabled'});
# skip non-cron tasks
next if ! $task->{'cron'};
$task->{'next_run'} = Time::Local::timelocal($task->{'cron'}->nextEvent);
}
}
# spawn tasks which should and could be spawned
#
sub run_scheduling {
# all configured tasks (by name)
my $tasks = $snaked::Daemon::runtime->{'tasks'};
# configured tasks which should be run (by name)
my $should_be_run_tasks = {};
# configured tasks which should and could be run now (by last_start time)
my $could_be_run_tasks = {};
# choose tasks which should be run
foreach my $task_name (keys %{$tasks}) {
my $task = $tasks->{$task_name};
# totally skip disabled tasks
next if defined($task->{'disabled'});
my $now = snaked::my_clock();
# postpone tasks which have start_random_sleep configured
#
if ($task->{'start_random_sleep'} && !$task->{'startup_sleep_finished'}) {
if (!$task->{'startup_sleep'}) {
$task->{'startup_sleep'} = int(rand($task->{'start_random_sleep'}));
$task->{'startup_sleep_started'} = $now;
Yandex::Tools::debug("task [$task_name] random sleep [$task->{'startup_sleep'}]");
}
if ($now->{'mono'} - $task->{'startup_sleep_started'}->{'mono'} > $task->{'startup_sleep'}) {
$task->{'startup_sleep_finished'} = $now;
Yandex::Tools::debug("task [$task_name] random sleep finished");
}
}
else {
# random startup sleep not configured for the task
$task->{'startup_sleep_finished'} = $now;
}
# skip tasks which are sleeping their random start-up time
#
next unless $task->{'startup_sleep_finished'};
# choose tasks which should be run
# (for which the time has come)
#
# decision about cron tasks can be made
# just using cron schedule, but for the
# execution_interval tasks we have to check
# last_finish/last_start times
#
if ($task->{'cron'}) {
if ($task->{'next_run'} <= $now->{'real'}) {
$should_be_run_tasks->{$task_name} = $task;
}
}
elsif ($task->{'execution_interval'}) {
if (!$task->{'last_finish'}) {
$should_be_run_tasks->{$task_name} = $task;
}
elsif ($task->{'last_start'}->{'mono'} + $task->{'execution_interval'} <= $now->{'mono'}) {
$should_be_run_tasks->{$task_name} = $task;
}
}
else {
# configuration validation is done in refreshOptions
}
}
# for each task which should be run check
# 1) that it's not already running
# 2) that it's possible to run it (dependencies)
#
foreach my $task_name (keys %{$should_be_run_tasks}) {
my $task = $should_be_run_tasks->{$task_name};
# skips tasks which are already running
next if find_child($task_name);
# remove tasks which were marked to be removed
# when the time comes to execute them knowing
# that it does not run currently (see above)
if ($task->{'TO_BE_REMOVED'}) {
delete $snaked::Daemon::runtime->{'tasks'}->{$task_name};
next;
}
# check that no conflicting tasks are running
#
my $conflicting_tasks_are_running;
CHECK_CONFLICTS: foreach my $ct_name (keys %{$task->{'conflicts_hash'}}) {
if (find_child($ct_name)) {
$conflicting_tasks_are_running = 1;
last CHECK_CONFLICTS;
}
}
next if $conflicting_tasks_are_running;
my $last_start_label = $task->{'last_start'}->{'mono'} || -1;
$could_be_run_tasks->{$last_start_label} = []
unless $could_be_run_tasks->{$last_start_label};
push (@{$could_be_run_tasks->{$last_start_label}}, $task_name);
}
# choose random task from the tasks
# with the oldest last_start time
# (or even no last_start time at all)
#
# start it and leave scheduling
# (so the algorithm is run on the next cycle,
# accounting for the currently spawned task)
#
START_TASK: foreach my $last_start_label (sort {$a <=> $b} keys %{$could_be_run_tasks}) {
my $tasks_to_be_started = $could_be_run_tasks->{$last_start_label};
my $total_possible_tasks = scalar(@{$tasks_to_be_started});
my $random_index = int(rand($total_possible_tasks));
my $task_name_to_start = ${$tasks_to_be_started}[$random_index];
Yandex::Tools::debug("starting [$task_name_to_start]");
add_child($task_name_to_start);
last START_TASK;
}
}
sub canonical_command_line {
my ($cmdline, $path) = @_;
return "" unless $cmdline && $path;
# suppress space in the end of command on freebsd
$cmdline =~ s/\ +$//go;
# replace path to the executable with full path
#
# notes:
# - regexp is not global so it replaces only 1st occurrence
#
# - .+? is not greedy so it will find the 1st occurrence of
# "(ps-)snaked" string which should be the name of executable
#
$cmdline =~ s/.+?(ps-)?snaked(\s+|$)/${path}\/snaked /;
$cmdline =~ s/\s+$//goi;
return $cmdline;
}
sub exec_ps_snaked {
my ($my_command_line, $my_path) = @_;
# on ws1-569 in snaked.log got:
#
# Mon Oct 19 17:59:17 2009 [/place/home/monitor/ps-snake/usr/local/ps-snake/bin/snaked] unable to exec --cfg /place/home/monitor/ps-snake/etc/ps-farm/options/ps-snaked
#
# which effectively means that $my_command_line was empty
# after calling canonical_command_line() below
# (" --cfg ..." was appended to it in the next step)
#
# so trying to determine my command line if it's empty
# (also added check on startup that we've got it)
#
# as a workaround for empty command line or path (why?)
# trying to determine them during exit process
if (!$my_command_line || !$my_path) {
($my_path, $my_command_line) = Yandex::Tools::ProcessList::get_my_path_commandline({'processes' =>
Yandex::Tools::ProcessList::get_process_table()});
}
$my_command_line = canonical_command_line($my_command_line, $my_path);
# append --cfg parameter if it's not specified
# (codepath is used only during first run
# when path to configuration was specified
# by environment variable)
if ($my_command_line !~ /--cfg $ENV{'PS_SNAKED_CFG'}/) {
$my_command_line .= " --cfg $ENV{'PS_SNAKED_CFG'}";
}
# set environment variable to specify that we want to cleanup
# already running snaked processes (this might be workaround
# for some FreeBSD or Proc::ProcessTable on FreeBSD bug,
# which caused the following:
#
# Thu Jun 24 10:29:31 2010 [/opt/home/monitor/ps-snake/usr/local/ps-snake/bin/snaked] clock moved back from Thu Jun 24 10:29:25 2010 to Thu Jun 24 10:29:24 2010, restarting
# Thu Jun 24 10:29:38 2010 [/opt/home/monitor/ps-snake/usr/local/ps-snake/bin/snaked] [24836] requested to restart
# Thu Jun 24 10:29:38 2010 [/opt/home/monitor/ps-snake/usr/local/ps-snake/bin/snaked] [24836] stopped
# Thu Jun 24 10:29:54 2010 [/opt/home/monitor/ps-snake/usr/local/ps-snake/bin/snaked] [WARN] [29246] snaked is already running: /usr/bin/perl /opt/home/monitor/ps-snake/usr/local/ps-snake/bin/snaked --daemon --cfg /opt/home/monitor/ps-snake/etc/ps-farm/options/ps-snaked [24836]
#
# [monitor@orange64 ~]$ uname -a
# FreeBSD orange64.yandex.ru 7.2-STABLE FreeBSD 7.2-STABLE #0 r199991M: Mon Feb 8 12:50:25 MSK 2010 root@distillatory.yandex.ru:/place/tmp/mk_pkg.wG1LSf1f/obj/place/GIT-repos/FreeBSD-7-r199991/sys/PRODUCTION amd64
#
# Proc::ProcessTable 0.54
#
$ENV{'snaked_cleanup_already_running'} = 1;
Yandex::Tools::exec($my_command_line);
}
# spawn additional watchdogs slowly,
# returns total number of running processes
#
sub manage_watchdogs {
if (!$watchdogs2maintain || $watchdogs2maintain eq 0) {
return 100; # some average number of processes (does not matter really)
}
my $ptable = Yandex::Tools::ProcessList::get_process_table();
my $number_of_watchdogs = 0;
# get the ps-snaked daemon process for which the watchdog is running
my $my_process = undef;
foreach my $p (@{$ptable}) {
next unless $p->cmndline;
next if !Yandex::Tools::matches_with_one_of_regexps($p->cmndline, [$watchdog_match, $watchdog_match1]);
$number_of_watchdogs = $number_of_watchdogs + 1;
}
if ($number_of_watchdogs < $watchdogs2maintain) {
my $t_cmdline = $my_command_line;
$t_cmdline = canonical_command_line($t_cmdline, $my_path);
$t_cmdline =~ s/\-\-daemon/\-\-watchdog/;
Yandex::Tools::run_forked($t_cmdline);
}
return scalar(@{$ptable});
}
sub stop_watchdogs {
my $ptable = Yandex::Tools::ProcessList::get_process_table();
# get the ps-snaked daemon process for which the watchdog is running
my $my_process = undef;
foreach my $p (@$ptable) {
next unless $p->cmndline;
next if !Yandex::Tools::matches_with_one_of_regexps($p->cmndline, [$watchdog_match, $watchdog_match1]);
kill (15, $p->pid);
}
}
# watchdog mode, starts ps-snaked daemon
# if finds that it's not running
sub run_watchdog {
# set daemon type to change signal handling slightly
$snaked::Daemon::runtime->{'type'} = 'watchdog';
my $unsuccessful_tries = 0;
my $life_time = 3600 * (rand($watchdogs2maintain) + 1);
while(1) {
debug_main_cycle("001");
# stop watchdogs from time to time to toss
# their pid numbers (which might affect oom killers),
# but not in case they detect that main process
# is not running (and waiting a bit to start it)
#
# watchdogs are restarted by main daemon.
#
if ((snaked::clock_mono() - $snaked::Daemon::runtime->{'start_time'}->{'mono'}) > $life_time && !$unsuccessful_tries) {
exit(0);
}
debug_main_cycle("002");
if ($snaked::Daemon::runtime->{'usec_2check_watchdog'} < 1) {
debug_main_cycle("003");
my $ptable = Yandex::Tools::ProcessList::get_process_table();
my $currently_running_watchdogs = 0;
# get the ps-snaked daemon process for which the watchdog is running
my $my_process = undef;
foreach my $p (@$ptable) {
my $p_cmndline;
my $r = Yandex::Tools::ProcessList::code_may_fail(sub {$p_cmndline = $p->cmndline});
next unless $p_cmndline;
if (Yandex::Tools::matches_with_one_of_regexps($p_cmndline, [$watchdog_match, $watchdog_match1])) {
$currently_running_watchdogs = $currently_running_watchdogs + 1;
}
elsif (Yandex::Tools::matches_with_one_of_regexps($p_cmndline, [$daemon_match_cfg, $daemon_match_cfg1])) {
# at this point any snaked is selected
# (even that which is starting
# or running external command)
my $p_pid;
my $p_ppid;
my $p_pgrp;
$r = Yandex::Tools::ProcessList::code_may_fail(sub {$p_pid = $p->pid});
$r = Yandex::Tools::ProcessList::code_may_fail(sub {$p_ppid = $p->ppid});
$r = Yandex::Tools::ProcessList::code_may_fail(sub {$p_pgrp = $p->pgrp});
next unless $p_pid && $p_ppid && $p_pgrp;
# real daemon is parented by init and is the process group leader,
# if its not found -- start it, and it will clean up any
# stuck child from previous daemon (shouldn't happen because
# children are strongly attached to the main daemon
# with use of terminate_on_sudden_parent_death flag of run_forked)
if ($p_ppid eq 1 && $p_pid eq $p_pgrp) {
$my_process = $p;
}
}
}
debug_main_cycle("004");
if ($my_process) {
$unsuccessful_tries = 0;
}
else {
$unsuccessful_tries = $unsuccessful_tries + 1;
}
if ($unsuccessful_tries > 0) {
if ($unsuccessful_tries < 2) {
# 4 seconds should be enough to start daemon
# (if it's not found and began to start -- is restarting),
# randomize each watchdog so they do not try to start
# all at the same time
#
my_usleep((4 + 4 * int(rand($currently_running_watchdogs))) * 1_000_000);
}
else {
Yandex::Tools::do_log("watchdog [$$]: snaked not found (killed?), respawning");
# replace --watchdog with --daemon
my $t_cmdline = $my_command_line;
$t_cmdline =~ s/\-\-watchdog/\-\-daemon/;
# try to execute daemon instead of watchdog
# if fork fails (wouldn't succeed probably,
# but could we try at least?)
#
if (defined(my $pid = fork)) {
if ($pid) {
my $waitpid;
# exec_ps_snaked forks before actually execing snaked
# and parent exits immediately (which makes it
# totally detached from watchdog)
#
while ($waitpid ne -1) {
$waitpid = waitpid($pid, WNOHANG);
my_usleep (1_000_000);
}
# watchdog to continue
$unsuccessful_tries = 0;
}
else {
# watchdog to become snaked
# (detached from parent totally)
exec_ps_snaked($t_cmdline, $my_path);
}
}
else {
exec_ps_snaked($t_cmdline, $my_path);
}
}
}
debug_main_cycle("005");
$snaked::Daemon::runtime->{'usec_2check_watchdog'} = watchdog_check_timeout({
'watchdogs2maintain' => $watchdogs2maintain,
'number_of_processes' => scalar(@{$ptable}),
});
}
debug_main_cycle("006");
my $slept = my_usleep(500000);
$snaked::Daemon::runtime->{'usec_2check_watchdog'} = $snaked::Daemon::runtime->{'usec_2check_watchdog'} - $slept;
debug_main_cycle("007");
}
exit (255);
}
sub get_cfg_path {
if (!$ENV{'PS_SNAKED_CFG'}) {
if (Yandex::Tools::defined_cmdline_param('cfg')) {
$ENV{'PS_SNAKED_CFG'} = Yandex::Tools::get_cmdline_param('cfg');
if (! -d "$ENV{'PS_SNAKED_CFG'}") {
die "Configuration does not exist: $ENV{'PS_SNAKED_CFG'}\n";
}
}
else {
$ENV{'PS_SNAKED_CFG'} = $ENV{'MY_ETC'};
if (! -d $ENV{'PS_SNAKED_CFG'}) {
$ENV{'PS_SNAKED_CFG'} = ($ENV{'MY_ROOT'} eq "/" ? "" : $ENV{'MY_ROOT'}) .
"/etc/ps-farm/options/ps-snaked";
}
if (! -d $ENV{'PS_SNAKED_CFG'} && -d "/etc/ps-farm/options/ps-snaked") {
$ENV{'PS_SNAKED_CFG'} = "/etc/ps-farm/options/ps-snaked";
}
}
}
if (! -d "$ENV{'PS_SNAKED_CFG'}") {
$ENV{'PS_SNAKED_CFG'} = undef;
}
else {
if (!$daemon_regexp_configured) {
$daemon_match_cfg = qr/^([^\s]+perl[^\s]*[\s]+|)[^\s]+(ps-)?snaked.+(daemon|debug).+cfg.+$ENV{'PS_SNAKED_CFG'}/;
$daemon_match_cfg1 = qr/^([^\s]+perl[^\s]*[\s]+|)[^\s]+(ps-)?snaked.+cfg.+$ENV{'PS_SNAKED_CFG'}.+(daemon|debug)/;
$daemon_match_nocfg = qr/^([^\s]+perl[^\s]*[\s]+|)[^\s]+(ps-)?snaked.+(daemon|debug)/;
$watchdog_match = qr/^([^\s]+perl[^\s]*[\s]+|)[^\s]+(ps-)?snaked.+(watchdog).+cfg.+$ENV{'PS_SNAKED_CFG'}/;
$watchdog_match1 = qr/^([^\s]+perl[^\s]*[\s]+|)[^\s]+(ps-)?snaked.+cfg.+$ENV{'PS_SNAKED_CFG'}.+(watchdog)/;
$daemon_regexp_configured = 1;
}
}
return $ENV{'PS_SNAKED_CFG'};
}
Yandex::Tools::read_cmdline();
get_cfg_path();
Yandex::Tools::ProcessList::set_options({
'daemon_match' => [$daemon_match_cfg, $daemon_match_cfg1],
'daemon_match_startup' => [$daemon_match_nocfg],
});
# commands which might work without actually
# finding configuration on the disk -- what they need
# is the PID which they get from the process table
#
if (Yandex::Tools::defined_cmdline_param('sample-config')) {
my $target_dir = Yandex::Tools::get_cmdline_param('sample-config') || "/etc/snaked";
if (-e $target_dir) {
Yandex::Tools::die("Directory [$target_dir] exists, not going to overwrite.", {'no_log' => 1});
}
File::Path::mkpath($target_dir);
if ($> eq 0) {
Yandex::Tools::write_file_scalar($target_dir . "/log", "/var/log/snaked.log\n");
Yandex::Tools::write_file_scalar($target_dir . "/admin_email", "root\n");
}
else {
Yandex::Tools::write_file_scalar($target_dir . "/log", "/tmp/snaked.log\n");
Yandex::Tools::write_file_scalar($target_dir . "/admin_email", getpwuid($>) . "\n");
}
File::Path::mkpath($target_dir . "/jobs/every_hour");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/every_hour/execution_schedule", "0 * * * *\n");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/every_hour/cmd", "uptime >> /tmp/snaked_every_hour\n");
chmod(0755, $target_dir . "/jobs/every_hour/cmd") || Yandex::Tools::die("Unable to set permissions on [" . $target_dir . "/jobs/every_hour/cmd" . "]", {'no_log' => 1});
File::Path::mkpath($target_dir . "/jobs/every_ten_seconds");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/every_ten_seconds/execution_interval", "10\n");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/every_ten_seconds/cmd", "uptime >> /tmp/snaked_every_ten_seconds\nsleep 2\n");
chmod(0755, $target_dir . "/jobs/every_ten_seconds/cmd") || Yandex::Tools::die("Unable to set permissions on [" . $target_dir . "/jobs/every_ten_seconds/cmd" . "]", {'no_log' => 1});
File::Path::mkpath($target_dir . "/jobs/fast_job");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/fast_job/execution_interval", "1\n");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/fast_job/cmd", "uptime >> /tmp/snaked_fast_job\n");
Yandex::Tools::write_file_scalar($target_dir . "/jobs/fast_job/conflicts", "every_ten_seconds\n");
chmod(0755, $target_dir . "/jobs/fast_job/cmd") || Yandex::Tools::die("Unable to set permissions on [" . $target_dir . "/jobs/fast_job/cmd" . "]", {'no_log' => 1});
print "written sample configuration to: $target_dir\n";
exit(0);
}
elsif (Yandex::Tools::defined_cmdline_param('stop')) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
print "requesting " . $d->pid() . " [" . $d->cmndline . "] to stop\n";
kill (15, $d->pid);
if (Yandex::Tools::defined_cmdline_param('wait')) {
while (Yandex::Tools::ProcessList::get_other_daemon_process({'refresh_startup_processes' => 1})) {
print ".";
my_usleep(1_000_000);
}
print "\n";
}
}
else {
print "no snaked daemon found for $ENV{'PS_SNAKED_CFG'}\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('configure')) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
print "requesting " . $d->pid() . " [" . $d->cmndline . "] to refresh configuration\n";
kill ("HUP", $d->pid)
}
else {
print "no snaked daemon found for $ENV{'PS_SNAKED_CFG'}\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('restart')) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
if (!Yandex::Tools::defined_cmdline_param('only-errors')) {
print "requesting " . $d->pid() . " [" . $d->cmndline . "] to restart\n";
}
kill ("USR2", $d->pid);
if (Yandex::Tools::defined_cmdline_param('wait')) {
my $running_daemon = Yandex::Tools::ProcessList::get_other_daemon_process({'refresh_startup_processes' => 1});
my $no_daemon_retries = 0;
my $new_daemon_retries = 3;
my $new_daemon_pid;
while (!$new_daemon_pid # while previous daemon shuts down and new starts up
||
$new_daemon_pid && # some new daemon started
$new_daemon_retries > 0 # check that new
) {
$running_daemon = Yandex::Tools::ProcessList::get_other_daemon_process({'refresh_startup_processes' => 1});
if (!$running_daemon) {
$no_daemon_retries = $no_daemon_retries + 1;
}
else {
$no_daemon_retries = 0;
}
if ($no_daemon_retries > 5) {
print "snaked won't start, still trying...";
$no_daemon_retries = 0;
}
if ($new_daemon_pid) {
if ($new_daemon_pid eq $running_daemon->pid) {
$new_daemon_retries = $new_daemon_retries - 1;
}
else {
$new_daemon_pid = $running_daemon->pid;
$new_daemon_retries = 3;
}
}
if (!$new_daemon_pid && $running_daemon && $d->pid ne $running_daemon->pid) {
$new_daemon_pid = $running_daemon->pid;
}
print ".";
my_usleep(1_000_000);
}
print "\n";
print "snaked is running as pid " . $running_daemon->pid . ". command line [" . $running_daemon->cmndline . "]\n";
}
}
else {
print "no snaked daemon found for $ENV{'PS_SNAKED_CFG'}\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('status')) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
print "snaked is running as pid " . $d->pid . ". command line [" . $d->cmndline . "]\n";
}
else {
print "no daemon running\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('detailed-status')) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
print "requesting " . $d->pid() . " [" . $d->cmndline . "] to save detailed status\n";
kill ("USR1", $d->pid)
}
else {
print "no snaked daemon found for $ENV{'PS_SNAKED_CFG'}\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('version')) {
print "$version\n";
exit 0;
}
my $i_am_watchdog = Yandex::Tools::defined_cmdline_param('watchdog');
if (Yandex::Tools::defined_cmdline_param('debug')) {
$watchdogs2maintain = 0;
}
# if we've haven't got any process manipulation command,
# any start-up command or configuration manipulation command --
# bail out
#
if (
!Yandex::Tools::defined_cmdline_param('daemon') &&
!Yandex::Tools::defined_cmdline_param('debug') &&
!$i_am_watchdog &&
!Yandex::Tools::defined_cmdline_param('show-jobs') &&
!Yandex::Tools::defined_cmdline_param('show-config') &&
!Yandex::Tools::defined_cmdline_param('show-job') &&
!Yandex::Tools::defined_cmdline_param('enable-jobs') &&
!Yandex::Tools::defined_cmdline_param('disable-jobs') &&
!Yandex::Tools::defined_cmdline_param('add-job') &&
!Yandex::Tools::defined_cmdline_param('delete-jobs') &&
!Yandex::Tools::defined_cmdline_param('modify-job')
) {
help();
exit 0;
}
# can't run configuration manipulation commands
# without having found snaked configuration to act upon
#
if (!get_cfg_path()) {
die "no configuration found (try creating /etc/snaked)";
}
snaked::refreshOptions($ENV{'PS_SNAKED_CFG'}, {'no-jobs' => $i_am_watchdog});
if (Yandex::Tools::defined_cmdline_param('show-config') || Yandex::Tools::defined_cmdline_param('show-jobs')) {
print " global options\n";
foreach my $k (sort keys %{$snaked::Daemon::runtime->{'config'}}) {
print " $k: " . $snaked::Daemon::runtime->{'config'}->{$k}->{'value'} . "\n";
}
print " configured jobs:\n";
foreach my $job_name (sort keys %{$snaked::Daemon::runtime->{'tasks'}}) {
print " " . $job_name . "\n";
my $job = $snaked::Daemon::runtime->{'tasks'}->{$job_name};
foreach my $o (sort keys %{$job}) {
print " $o: ";
if (ref($job->{$o}) eq 'ARRAY') {
print join(",", @{$job->{$o}});
}
else {
print $job->{$o};
}
print "\n";
}
}
exit 0;
}
if (Yandex::Tools::defined_cmdline_param('show-job')) {
my $job_name = Yandex::Tools::get_cmdline_param('show-job');
my $params = Yandex::Tools::get_cmdline_param();
if (!$snaked::Daemon::runtime->{'tasks'}->{$job_name}) {
print "undefined job: $job_name\n";
exit 0;
}
my $job = $snaked::Daemon::runtime->{'tasks'}->{$job_name};
foreach my $o (sort keys %{$job}) {
# skip invalid params
if (scalar(keys %{$params}) > 1 && !$params->{$o}) {
next;
}
print "$o: ";
if (ref($job->{$o}) eq 'ARRAY') {
print join(",", @{$job->{$o}});
}
else {
print $job->{$o};
}
print "\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('disable-jobs') || Yandex::Tools::defined_cmdline_param('enable-jobs')) {
my $action;
my $job_list;
if (Yandex::Tools::defined_cmdline_param('disable-jobs')) {
$job_list = Yandex::Tools::get_cmdline_param('disable-jobs');
$action = "disable";
}
elsif (Yandex::Tools::defined_cmdline_param('enable-jobs')) {
$job_list = Yandex::Tools::get_cmdline_param('enable-jobs');
$action = "enable";
}
$job_list = "" unless $job_list;
my $modified_some;
foreach my $job_name (sort keys %{$snaked::Daemon::runtime->{'tasks'}}) {
if ($job_list) {
next if $job_list !~ /^\s?$job_name\s?$/;
}
my $task_def = $snaked::Daemon::runtime->{'tasks'}->{$job_name};
if ($action eq 'disable') {
next if $task_def->{'disabled'};
Yandex::Tools::write_file_option($task_def->{'dirinfo'}->{'absolute_name'} . "/disabled");
Yandex::Tools::do_log("disabled $job_name");
print "disabled $job_name\n";
}
elsif ($action eq 'enable') {
next if !$task_def->{'disabled'};
unlink ($task_def->{'dirinfo'}->{'absolute_name'} . "/disabled");
Yandex::Tools::do_log("enabled $job_name");
print "enabled $job_name\n";
}
$modified_some = 1;
}
if ($modified_some) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
print "requesting " . $d->pid() . " [" . $d->cmndline . "] to refresh configuration\n";
kill ("HUP", $d->pid)
}
}
else {
print "no jobs modified\n";
}
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('add-job')) {
my $job_name = Yandex::Tools::get_cmdline_param('add-job');
unless (defined($job_name) && $job_name) {
print "job name is missing\n";
exit 1;
}
if (defined($snaked::Daemon::runtime->{'tasks'}->{$job_name})) {
print "job [$job_name] already defined\n";
exit 1;
}
my $param_values = {};
my $mandatory_params = {
'cmd' => 1,
};
my $optional_params = {
'execution_interval' => 1,
'notification_interval' => 1,
'execution_timeout' => 1,
'admin_email' => 1,
'conflicts' => 1,
};
my @err = ();
if (!Yandex::Tools::defined_cmdline_param('execution_interval') &&
!Yandex::Tools::defined_cmdline_param('execution_schedule')) {
push(@err, "either [execution_interval] or [execution_schedule] must be specified");
}
foreach my $k (keys %{$mandatory_params}) {
unless (Yandex::Tools::defined_cmdline_param($k)) {
push(@err, "mandatory parameter [$k] is missing");
next;
}
my $v = Yandex::Tools::get_cmdline_param($k);
unless ($v) {
push(@err, "missing value for mandatory parameter [$k]");
next;
}
$param_values->{$k} = $v;
}
foreach my $k (keys %{$optional_params}) {
my $v = Yandex::Tools::get_cmdline_param($k);
if ($v) {
$param_values->{$k} = $v;
}
}
if (@err) {
foreach my $msg (@err) {
print "$msg\n";
}
exit 1;
}
my $jobs_dir = $ENV{'PS_SNAKED_CFG'} . '/jobs';
unless (-d $jobs_dir) {
print "[$jobs_dir] is not a directory or does not exist\n";
exit 1;
}
File::Path::mkpath("$jobs_dir/$job_name");
# Temporary disable job to be sure that
# snaked would not read incomplete job definition
Yandex::Tools::write_file_option("$jobs_dir/$job_name/disabled");
foreach my $k (keys %{$param_values}) {
Yandex::Tools::write_file_option("$jobs_dir/$job_name/$k", $param_values->{$k});
}
chmod(0755, "$jobs_dir/$job_name/cmd");
# Remove temporary option
unless (Yandex::Tools::defined_cmdline_param('disabled')) {
unlink("$jobs_dir/$job_name/disabled");
}
print "added job [$job_name]\n";
Yandex::Tools::do_log("added job [$job_name]");
# TODO if --apply then restart snaked
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('delete-jobs')) {
my @jobs = split(/\s+/o, Yandex::Tools::get_cmdline_param('delete-jobs'));
unless (@jobs) {
print "job name is missing\n";
exit 1;
}
my $jobs_dir = $ENV{'PS_SNAKED_CFG'} . '/jobs';
unless (-d $jobs_dir) {
print "[$jobs_dir] is not a directory or does not exist\n";
exit 1;
}
foreach my $j (@jobs) {
unless (-d "$jobs_dir/$j") {
print "skipping non-existent job [$j]\n";
next;
}
File::Path::rmtree("$jobs_dir/$j");
print "deleted job [$j]\n";
Yandex::Tools::do_log("deleted job [$j]");
}
# TODO if --apply then restart snaked
exit 0;
}
elsif (Yandex::Tools::defined_cmdline_param('modify-job')) {
my $job_name = Yandex::Tools::get_cmdline_param('modify-job');
unless (defined($job_name) && $job_name) {
print "job name is missing\n";
exit 1;
}
unless (defined($snaked::Daemon::runtime->{'tasks'}->{$job_name})) {
print "job [$job_name] is not defined\n";
exit 1;
}
my $jobs_dir = $ENV{'PS_SNAKED_CFG'} . '/jobs';
unless (-d $jobs_dir) {
print "[$jobs_dir] is not a directory or does not exist\n";
exit 1;
}
my %params = (
'admin_email' => 1,
'cmd' => 1,
'execution_interval' => 1,
'execution_timeout' => 1,
'notification_interval' => 1,
'disabled' => 1,
);
my %update = ();
my @delete = ();
my @err = ();
foreach my $k (keys %params) {
if (Yandex::Tools::defined_cmdline_param($k)) {
my $v = Yandex::Tools::get_cmdline_param($k);
unless ($v) {
push(@err, "missing value for parameter [$k]");
next;
}
$update{$k} = $v;
}
else {
push(@delete, $k);
}
}
if (@err) {
foreach my $msg (@err) {
print "$msg\n";
}
exit 1;
}
my $disabled = (-f "$jobs_dir/$job_name/disabled") ? 1 : 0;
if (defined($update{'disabled'})) {
$disabled = $update{'disabled'};
}
# Temporary disable job to be sure that
# snaked would not read incomplete job definition
if ($disabled == 0) {
Yandex::Tools::write_file_option("$jobs_dir/$job_name/disabled");
}
foreach my $k (keys %update) {
Yandex::Tools::write_file_option("$jobs_dir/$job_name/$k", $update{$k});
}
# Delete job options if they were not redefined
# foreach my $k (@delete) {
# my $file = "$jobs_dir/$job_name/$k";
# unlink($file) if (-f $file);
# }
# Remove temporary option
if ($disabled == 0) {
unlink("$jobs_dir/$job_name/disabled");
}
print "modified job [$job_name]\n";
Yandex::Tools::do_log("modified job [$job_name]");
# TODO if --apply then restart snaked
exit 0;
}
if (config_value('log_errors')) {
if (!Yandex::Tools::can_write(config_value('log_errors'))) {
Yandex::Tools::warn("Can not write to log_errors file [" . config_value('log_errors') .
"], check permissions.");
delete($snaked::Daemon::runtime->{'config'}->{'log_errors'});
}
}
if (!$i_am_watchdog) {
my $d = Yandex::Tools::ProcessList::get_other_daemon_process();
if ($d) {
if (!$ENV{'snaked_cleanup_already_running'}) {
Yandex::Tools::warn("[$$] snaked is already running: " . $d->cmndline . " [" . $d->pid . "]");
exit 1;
}
else {
my $previous_snaked = $d;
$ENV{'snaked_cleanup_already_running'} = undef;
kill(-9, $d->pid);
my_usleep(3_000_000);
$d = Yandex::Tools::ProcessList::get_other_daemon_process({'refresh_startup_processes' => 1});
if ($d) {
Yandex::Tools::warn("[$$] snaked is already running: " . $d->cmndline . " [" . $d->pid . "] and doesn't stop on KILL signal");
exit 1;
}
else {
Yandex::Tools::warn("[$$] killed previously running snaked: " . $previous_snaked->cmndline . " [" . $previous_snaked->pid . "], continuing to start");
}
}
}
}
($my_path, $my_command_line) = Yandex::Tools::ProcessList::get_my_path_commandline();
Yandex::Tools::debug("my_path: $my_path");
Yandex::Tools::debug("my_command_line: $my_command_line");
print "starting snaked daemon for $ENV{'PS_SNAKED_CFG'}\n"
unless $i_am_watchdog;
if (Yandex::Tools::defined_cmdline_param('daemon') || $i_am_watchdog) {
# restart daemon using its full pathname and config path
# if it was not started like this (so we could distinguish
# between daemons by their locations)
if ($my_command_line !~ /$my_path/ || $my_command_line !~ /--cfg $ENV{'PS_SNAKED_CFG'}/) {
sigUSR2_handler();
}
Yandex::Tools::daemonize();
# run watchdog (except for when snaked
# would be restarted right after start)
if ($i_am_watchdog && !$snaked::Daemon::runtime->{'flags'}->{'restart'}) {
run_watchdog();
exit;
}
}
elsif ($Yandex::Tools::debug) {
# stay in foreground
}
Yandex::Tools::do_log("[$$] started");
if ($snaked::Daemon::runtime->{'config'}->{'pidfile'} &&
!$snaked::Daemon::runtime->{'flags'}->{'restart'} &&
!$i_am_watchdog) {
if (Yandex::Tools::can_write($snaked::Daemon::runtime->{'config'}->{'pidfile'}->{'value'})) {
Yandex::Tools::write_file_option($snaked::Daemon::runtime->{'config'}->{'pidfile'}->{'value'}, $$);
}
}
my $max_job_time = config_value('max_job_time');
$max_job_time = 3600 * 2 unless $max_job_time;
my $previous_clock;
my $now_clock;
while (1) {
debug_main_cycle("001");
$previous_clock = $now_clock;
$now_clock = snaked::my_clock();
if ($previous_clock && $now_clock) {
my $adjustment = clock_adjusted($now_clock, {
'start' => $previous_clock,
'left_threshold' => -60,
'right_threshold' => 60,
'return_adjustment' => 1
});
# clock adjusted for more than a minute,
# need to reschedule cron tasks
if ($adjustment) {
Yandex::Tools::do_log("clock adjusted [$adjustment], rescheduling cron tasks");
reschedule_cron_tasks();
}
}
debug_main_cycle("002");
if (!$snaked::Daemon::runtime->{'flags'}->{'restart'}) {
if ($snaked::Daemon::runtime->{'usec_2check_watchdog'} < 1) {
my $total_number_of_processes = 0;
if ($watchdogs2maintain) {
$total_number_of_processes = manage_watchdogs();
}
$snaked::Daemon::runtime->{'usec_2check_watchdog'} =
watchdog_check_timeout({
'watchdogs2maintain' => $watchdogs2maintain,
'number_of_processes' => $total_number_of_processes,
});
}
}
debug_main_cycle("003");
my $have_active_children = values %{$snaked::Daemon::runtime->{'children'}->{'by_pid'}};
Yandex::Tools::debug("active children:") if $have_active_children;
# check status of all children removing those which finished
foreach my $v (values %{$snaked::Daemon::runtime->{'children'}->{'by_pid'}}) {
# minimize gettime calls a bit
my $now_mono = snaked::clock_mono();
# check for really long running processes
# and kill them brutally (not very fast
# if killing doesn't work; blocking io?)
#
if (($now_mono - $v->{'borntime'}) > $max_job_time && ($now_mono - $v->{'killtime'}) > 5) {
# kill first then log, because logging might fail
# which leads to "die"
# killing exactly child pid, which is only a "manager"
# for the task; open3_run which is executed inside the child
# checks whether manager is alive and terminates if not,
# so killing manager notifies child that it should stop.
kill(9, $v->{'pid'});
$v->{'killtime'} = snaked::clock_mono();
do_err_log("killed long running (". ($now_mono - $v->{'borntime'}) .
" seconds) process [$v->{'pid'}] [$v->{'name'}]", {"stderr" => 1});
Yandex::Tools::do_log("killed long running (". ($now_mono - $v->{'borntime'}) .
" seconds) process [$v->{'pid'}] [$v->{'name'}]", {"stderr" => 1});
}
my $waitpid = waitpid($v->{'pid'}, WNOHANG);
Yandex::Tools::debug("\tchild [$v->{'pid'}] [$v->{'name'}] [" . ($v->{'id'} ? $v->{'id'} : "") . "]: $waitpid;".
" running " . ($now_mono - $v->{'borntime'}) . " seconds");
manage_child($v->{'pid'});
if ($waitpid eq -1) {
remove_child($v->{'pid'});
}
}
debug_main_cycle("004");
if ($snaked::Daemon::runtime->{'flags'}->{'refresh_configuration'} ||
$snaked::Daemon::runtime->{'usec_2refresh_configuration'} < 1) {
if ($snaked::Daemon::runtime->{'flags'}->{'refresh_configuration'}) {
Yandex::Tools::do_log("requested to reread configuration, rereading");
}
snaked::refreshOptions($ENV{'PS_SNAKED_CFG'});
$snaked::Daemon::runtime->{'flags'}->{'refresh_configuration'} = 0;
$snaked::Daemon::runtime->{'usec_2refresh_configuration'} = 1000000 * 60;
}
if ($snaked::Daemon::runtime->{'flags'}->{'restart'}) {
if ($Yandex::Tools::debug) {
Yandex::Tools::warn("unable to restart attached daemon");
$snaked::Daemon::runtime->{'flags'}->{'restart'} = 0;
}
else {
if (!$snaked::Daemon::runtime->{'flags'}->{'stop'}) {
Yandex::Tools::do_log("[$$] requested to restart");
$snaked::Daemon::runtime->{'flags'}->{'stop'} = 1;
}
}
}
if ($snaked::Daemon::runtime->{'flags'}->{'detailed_status'}) {
my $res = write_spool($$ . "_" . snaked::clock_mono() . "_status", Data::Dumper::Dumper($snaked::Daemon::runtime));
if ($res && $res->{'ok'}) {
Yandex::Tools::do_log("[$$] saved detailed status to [$res->{'full_pathname'}]");
}
elsif ($res && $res->{'errtext'}) {
Yandex::Tools::do_log("[$$] error saving detailed status: " . $res->{'errtext'});
}
$snaked::Daemon::runtime->{'flags'}->{'detailed_status'} = 0;
}
debug_main_cycle("005");
# do processing if we were not requested to stop
unless ($snaked::Daemon::runtime->{'flags'}->{'stop'}) {
# do not run scheduling (spawn new childs) before
# the timeout expires. timeout is set in case
# of failure during fork.
#
if ($snaked::Daemon::runtime->{'usec_2wait_before_fork'} < 1) {
run_scheduling();
}
}
else {
# wait for children to exit and exit then
if (have_children()) {
for_each_child ({'stop_now' => 1});
Yandex::Tools::debug("waiting for children to exit");
my_usleep(1_000_000);
}
else {
unlink($snaked::Daemon::runtime->{'config'}->{'pidfile'}->{'value'})
if $snaked::Daemon::runtime->{'config'}->{'pidfile'};
Yandex::Tools::do_log("[$$] stopped");
# do not restart watchdogs on restart as they will try
# to start snaked if restart fails (which should not happen
# but happens in 0,02-0,03 % of cases)
#
# we may want to send some signal to watchdogs here
# to notify them about restart so they could extend
# their waiting cycle a bit
#
if ($snaked::Daemon::runtime->{'flags'}->{'restart'}) {
exec_ps_snaked($my_command_line, $my_path);
}
else {
stop_watchdogs() if !$i_am_watchdog;
}
exit 0;
}
}
debug_main_cycle("006");
my $usec_to_sleep;
if ($Yandex::Tools::debug) {
Yandex::Tools::debug("-");
$usec_to_sleep = 1_000_000;
}
else {
if ($have_active_children) {
$usec_to_sleep = 500_000;
}
else {
$usec_to_sleep = 500_000;
}
}
my $slept = my_usleep($usec_to_sleep);
$snaked::Daemon::runtime->{'usec_2check_watchdog'} = $snaked::Daemon::runtime->{'usec_2check_watchdog'} - $slept;
$snaked::Daemon::runtime->{'usec_2refresh_configuration'} = $snaked::Daemon::runtime->{'usec_2refresh_configuration'} - $slept;
$snaked::Daemon::runtime->{'usec_2wait_before_fork'} = $snaked::Daemon::runtime->{'usec_2wait_before_fork'} - $slept;
debug_main_cycle("007");
}
# yes i know this is the way
# to the world of endless may
exit(255);