The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package RunnerRole;
# vim: ts=2 sw=2 noexpandtab

use Reflex::Role;

attribute_parameter att_pid          => "pid";
attribute_parameter att_stderr       => "stderr";
attribute_parameter att_stdin        => "stdin";
attribute_parameter att_stdout       => "stdout";
callback_parameter  cb_exit          => qw( on att_pid exit );
callback_parameter  cb_stderr_closed => qw( on att_stderr closed );
callback_parameter  cb_stderr_data   => qw( on att_stderr data );
callback_parameter  cb_stderr_error  => qw( on att_stderr error );
callback_parameter  cb_stdout_closed => qw( on att_stdout closed );
callback_parameter  cb_stdout_data   => qw( on att_stdout data );
callback_parameter  cb_stdout_error  => qw( on att_stdout error );
method_parameter    method_put       => qw( put att_stdin _ );

role {
	my $p = shift;

	with 'Reflex::Role::OutStreaming' => {
		att_handle  => $p->att_stdin(),
		method_put  => $p->method_put(),
	};

	my $m_stdout_stop = "stop_" . $p->att_stdout();
	my $cb_stdout_closed = $p->cb_stdout_closed();

	requires(
		map { $p->$_() } qw(
			att_pid att_stderr att_stdin att_stdout
			cb_exit
			cb_stderr_closed cb_stderr_data cb_stderr_data
			cb_stdout_closed cb_stdout_data cb_stdout_data
		)
	);

	after $cb_stdout_closed => sub {
		my ($self, $args) = @_;
		$self->$m_stdout_stop();
	};

	with 'Reflex::Role::InStreaming' => {
		att_handle => $p->att_stdout(),
		cb_data    => $p->cb_stdout_data(),
		cb_error   => $p->cb_stdout_error(),
		cb_closed  => $cb_stdout_closed,
	};

	my $m_stderr_stop = "stop_" . $p->att_stderr();
	my $cb_stderr_closed = $p->cb_stderr_closed();

	after $cb_stderr_closed => sub {
		my ($self, $args) = @_;
		$self->$m_stderr_stop();
	};

	with 'Reflex::Role::InStreaming' => {
		att_handle => $p->att_stderr(),
		cb_data    => $p->cb_stderr_data(),
		cb_error   => $p->cb_stderr_error(),
		cb_closed  => $cb_stderr_closed,
	};

	with 'Reflex::Role::PidCatcher' => {
		att_pid => $p->att_pid(),
		cb_exit => $p->cb_exit(),
	};
};

1;

__END__

extends 'Reflex::Base';

use Reflex::Trait::Watched qw(watches);
use Reflex::PID;

use Carp qw(croak);
use IPC::Run qw(start);
use Symbol qw(gensym);


__END__

watches process => ( isa => 'Maybe[Reflex::PID]', is => 'rw' );

has [qw(stdin stdout stderr)] => (
	isa => 'Maybe[FileHandle]',
	is  => 'rw',
);

has ipc_run => ( isa => 'IPC::Run', is => 'rw' );

has cmd => (
	isa       => 'ArrayRef',
	is        => 'ro',
	required  => 1,
);

### Reap the child process.

sub on_process_exit {
	my ($self, $args) = @_;
	$self->emit(event => 'exit', args  => $args);
}

sub kill {
	my ($self, $signal) = @_;
	croak "no process to kill" unless $self->process();
	$signal ||= 'TERM';
	kill $signal, $self->process()->pid();
}

### Write to standard input.

sub on_stdin_error {
	my ($self, $args) = @_;
	$self->emit(event => 'stdin_error', args => $args);
}

with 'Reflex::Role::Writing' => { att_handle  => 'stdin' };

sub on_stdin_writable {
	my ($self, $arg) = @_;
	my $octets_left = $self->flush_stdin();
	return if $octets_left;
	$self->flush_stdin();
}

with 'Reflex::Role::Writable' => { att_handle => 'stdin' };

### Read from standard output.

sub on_stdout_readable {
	my ($self, $arg) = @_;
	my $octets_read = $self->read_stdout($arg);
	warn $octets_read;
	return if $octets_read;
	if (defined $octets_read) {
		warn 111;
		$self->pause_stdout_readable();
		return;
	}
	$self->stop_stdout_readable();
}

sub on_stdout_error {
	my ($self, $args) = @_;
	$self->emit(event => 'stdout_error', args => $args);
	$self->stop_stdout_readable();
}

with 'Reflex::Role::Reading' => {
	att_handle  => 'stdout',
	cb_data     => make_emitter(on_stdout => "stdout"),
};

with 'Reflex::Role::Readable' => {
	att_handle  => 'stdout',
	cb_ready    => 'on_stdout_readable',
};

### Read from standard error.

sub on_stderr_error {
	my ($self, $args) = @_;
	$self->emit(event => 'stderr_error', args => $args);
	$self->stop_stderr_readable();
}

sub on_stderr_readable {
	my ($self, $arg) = @_;
	my $octets_read = $self->read_stderr($arg);
	warn $octets_read;
	return if $octets_read;
	if (defined $octets_read) {
		warn 111;
		$self->pause_stderr_readable();
		return;
	}
	$self->stop_stderr_readable();
}

with 'Reflex::Role::Reading' => {
	att_handle  => 'stderr',
	cb_data     => make_emitter(on_stderr => "stderr"),
};

with 'Reflex::Role::Readable' => {
	att_handle  => 'stderr',
	cb_ready    => 'on_stderr_readable',
};

sub BUILD {
	my $self = shift;

	my ($fh_in, $fh_out, $fh_err) = (gensym(), gensym(), gensym());

	$self->ipc_run(
		start(
			$self->cmd(),
			'<pipe', $fh_in,
			'>pipe', $fh_out,
			'2>pipe', $fh_err,
		)
	) or die "IPC::Run start() failed: $? ($!)";

	$self->process(
		Reflex::PID->new(
			pid => $self->ipc_run->{KIDS}[0]{PID}
		)
	);

	$self->att_stdin($fh_in);
	$self->att_stdout($fh_out);
	$self->att_stderr($fh_err);
}

1;