package Event::ExecFlow::Job::Command;
use base qw( Event::ExecFlow::Job );
use Locale::TextDomain $Event::ExecFlow::locale_textdomain;
use strict;
use AnyEvent;
# prevent warnings from AnyEvent
{ package AnyEvent::Impl::Event::CondVar;
package AnyEvent::Impl::Event::Glib; }
sub get_type { "command" }
sub get_exec_type { "async" }
#------------------------------------------------------------------------
sub get_command { shift->{command} }
sub get_fetch_output { shift->{fetch_output} }
sub get_node { shift->{node} }
sub get_output { shift->{output} }
sub get_progress_parser { shift->{progress_parser} }
sub get_got_exec_ok { shift->{got_exec_ok} }
sub get_configure_callback { shift->{configure_callback} }
sub set_command { shift->{command} = $_[1] }
sub set_fetch_output { shift->{fetch_output} = $_[1] }
sub set_node { shift->{node} = $_[1] }
sub set_output { shift->{output} = $_[1] }
sub set_progress_parser { shift->{progress_parser} = $_[1] }
sub set_got_exec_ok { shift->{got_exec_ok} = $_[1] }
sub set_configure_callback { shift->{configure_callback} = $_[1] }
#------------------------------------------------------------------------
sub get_pids { shift->{pids} }
sub get_fh { shift->{fh} }
sub get_watcher { shift->{watcher} }
sub get_executed_command { shift->{executed_command} }
sub set_pids { shift->{pids} = $_[1] }
sub set_fh { shift->{fh} = $_[1] }
sub set_watcher { shift->{watcher} = $_[1] }
sub set_executed_command { shift->{executed_command} = $_[1] }
#------------------------------------------------------------------------
sub new {
my $class = shift;
my %par = @_;
my ($command, $fetch_output, $node, $progress_parser) =
@par{'command','fetch_output','node','progress_parser'};
my ($configure_callback) =
$par{'configure_callback'};
my $self = $class->SUPER::new(@_);
$self->set_command($command);
$self->set_fetch_output($fetch_output);
$self->set_node($node);
$self->set_progress_parser($progress_parser);
$self->set_configure_callback($configure_callback);
return $self;
}
sub init {
my $self = shift;
$self->SUPER::init();
$self->set_pids([]);
$self->set_fh();
$self->set_watcher();
$self->set_output("");
1;
}
sub execute {
my $self = shift;
$self->open_pipe;
1;
}
sub open_pipe {
my $self = shift;
my $command = $self->get_command;
if ( ref $command eq 'CODE' ) {
$Event::ExecFlow::JOB = $self;
$command = $command->($self);
$Event::ExecFlow::JOB = undef;
}
if ( $self->get_configure_callback ) {
my $cb = $self->get_configure_callback;
$command = &$cb($command);
}
if ( $self->get_node ) {
$command = $self->get_node->prepare_command($command, $self);
}
$command =~ s/\s+$//;
my $execflow = $command =~ /execflow/ ? "" : "execflow ";
$command = $execflow.$command;
$command .= " && echo EXECFLOW_OK" if $command !~ /EXECFLOW_OK/;
$self->log (__x("Executing command: {command}", command => $command));
$Event::ExecFlow::DEBUG && print "Command(".$self->get_info."): command=$command\n";
$self->set_executed_command($command);
local $ENV{LC_ALL} = "C";
local $ENV{LANG} = "C";
my $pid = open (my $fh, "( $command ) 2>&1 |")
or die "can't fork '$command'";
my $watcher = AnyEvent->io ( fh => $fh, poll => 'r', cb => sub {
$self->command_progress;
});
push @{$self->get_pids}, $pid;
$self->set_fh($fh);
$self->set_watcher($watcher);
return $fh;
}
sub close_pipe {
my $self = shift;
$self->set_watcher(undef);
close($self->get_fh);
$self->set_fh(undef);
$self->set_pids([]);
if ( !$self->get_error_message && !$self->get_got_exec_ok ) {
$self->set_error_message(
"Command exits with failure code:\n".
"Command: ".$self->get_executed_command."\n\n".
"Output: ".$self->get_output
);
}
1;
}
sub command_progress {
my $self = shift;
my $fh = $self->get_fh;
#-- read and check for eof
my $buffer;
if ( !sysread($fh, $buffer, 4096) ) {
$self->close_pipe;
$self->execution_finished;
return;
}
#-- get job's PID
my ($pid) = ( $buffer =~ /EXEC_FLOW_JOB_PID=(\d+)/ );
if ( defined $pid ) {
push @{$self->get_pids}, $pid;
$buffer =~ s/EXEC_FLOW_JOB_PID=(\d+)\n//;
}
#-- succesfully executed?
if ( $buffer =~ s/EXECFLOW_OK\n// ) {
$self->set_got_exec_ok(1);
}
#-- store output
if ( $self->get_fetch_output ) {
$self->{output} .= $buffer;
} else {
$self->{output} = substr($self->{output}.$buffer,-16384);
}
#-- parse output & report progress
my $progress_parser = $self->get_progress_parser;
if ( ref $progress_parser eq 'CODE' ) {
$progress_parser->($self, $buffer);
}
elsif ( ref $progress_parser eq 'Regexp' ) {
if ( $buffer =~ $progress_parser ) {
$self->set_progress_cnt($1);
}
}
$self->get_frontend->report_job_progress($self)
if $self->progress_has_changed;
1;
}
sub cancel {
my $self = shift;
$self->set_cancelled(1);
my $pids = $self->get_pids;
return unless @{$pids};
kill 9, @{$pids};
$self->log(__x("Sending signal 9 to PID(s)")." ".join(", ", @{$pids}));
1;
}
sub pause_job {
my $self = shift;
my $signal;
if ( $self->get_paused ) {
$signal = "STOP";
}
else {
$signal = "CONT";
}
my $pids = $self->get_pids;
kill $signal, @{$pids} if @{$pids};
1;
}
sub backup_state {
my $self = shift;
my $data_href = $self->SUPER::backup_state();
delete $data_href->{configure_callback};
delete $data_href->{progress_parser};
delete $data_href->{node};
delete $data_href->{watcher};
delete $data_href->{fh};
delete $data_href->{command}
if ref $data_href->{command} eq 'CODE';
return $data_href;
}
1;
__END__
=head1 NAME
Event::ExecFlow::Job::Command - External command for async execution
=head1 SYNOPSIS
Event::ExecFlow::Job::Command->new (
command => Shell command to be executed,
fetch_output => Boolean if output should be fetched,
progress_parser => A closure or regex for progress parsing,
configure_callback => A closure to configure the command
before execution,
...
Event::ExecFlow::Job attributes
);
=head1 DESCRIPTION
Use this module for asynchronous execution of an external command
with Event::ExecFlow.
=head1 OBJECT HIERARCHY
Event::ExecFlow
Event::ExecFlow::Job
+--- Event::ExecFlow::Job::Command
Event::ExecFlow::Frontend
Event::ExecFlow::Callbacks
=head1 ATTRIBUTES
Attributes can by accessed at runtime using the common get_ATTR(),
set_ATTR() style accessors.
[ FIXME: describe all attributes in detail ]
=head1 METHODS
[ FIXME: describe all methods in detail ]
=head1 AUTHORS
Jörn Reder <joern at zyn dot de>
=head1 COPYRIGHT AND LICENSE
Copyright 2005-2006 by Jörn Reder.
This library is free software; you can redistribute it and/or modify
it under the terms of the GNU Library General Public License as
published by the Free Software Foundation; either version 2.1 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307
USA.
=cut