The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# Forks::Super::Job::Ipc -- manage temporary files, sockets, pipes
#   that facilitate communication between
#   parent and child processes
# implementation of
#     fork { child_fh => ... }
#     fork { stdin =>   $input | \$input | \@input }
#     fork { stdout => \$output }
#     fork { stderr => \$error }

package Forks::Super::Job::Ipc;
use Forks::Super::Config;
use Forks::Super::Debug qw(:all);
use Forks::Super::Util qw(IS_WIN32 is_socket);
use Forks::Super::Tie::IPCFileHandle;
use Forks::Super::Tie::IPCSocketHandle;
use Forks::Super::Tie::IPCPipeHandle;

use Forks::Super::Tie::IPCDupSTDIN;

use Signals::XSIG;
use IO::Handle;
use File::Path;
use Cwd;
use Carp;
use Exporter;
use strict;
use warnings;

$| = 1;

our @ISA = qw(Exporter);
our @EXPORT = qw(close_fh);
our $VERSION = '0.68';

our $MAIN_PID = $$;
our $__OPEN_FH = 0; # for debugging, monitoring filehandle usage. Not ready.
our $__MAX_OPEN_FH = do {
    no warnings 'once';
our %__OPEN_FH;

our @SAFEOPENED = ();
our $USE_TIE_FH = $] >= 5.008;
our $USE_TIE_SH = $] >= 5.008;
our $USE_TIE_PH = $] >= 5.008;

if ($ENV{NO_TIES}) {
    $USE_TIE_SH = $USE_TIE_FH = $USE_TIE_PH = 0;

our $TIE_FH_CLASS = 'Forks::Super::Tie::IPCFileHandle';

our $_IPC_DIR;
my $cleanse_mode = 0;

# package Forks::Super::Job::Ipc::Tie;

    # special behavior for $Forks::Super::IPC_DIR ==>
    # when this value is set, we should call set_ipc_dir.

    sub Forks::Super::Job::Ipc::Tie::TIESCALAR {
	return bless {}, 'Forks::Super::Job::Ipc::Tie';
    sub Forks::Super::Job::Ipc::Tie::FETCH {
	my $self = shift;
	return $_IPC_DIR;
    sub Forks::Super::Job::Ipc::Tie::STORE {
	my ($self, $value) = @_;
	my $old = $_IPC_DIR;
	Forks::Super::Job::Ipc::set_ipc_dir($value, 1);
	return $old;
    sub Forks::Super::Job::Ipc::Tie::DEFINED {
	my $self = shift;
	return defined $_IPC_DIR;

    # independent implementation of Symbol::gensym --

    # IO handles from this package will follow certain conventions
    #   1. created with _safeopen, _create_socket_pair, or _create_pipe
    #   2. attributes are set in the handle's namespace (glob, is_socket,
    #      opened, etc.)
    #   3. fileno() stored in %Forks::Super::Job::Ipc::FILENO
    # Another one of these conventions will be that all such handles will
    # be registered in the same namespace, so we can tell whether
    # an arbitrary handle was created by this module.

    my $pkg = 'Forks::Super::IOHandles::';
    my $seq = 1000;     # an arbitrary starting point

    sub _gensym () {
	no strict 'refs';
	my $name = join '_', @_, "IO$$", $seq++;
	my $ref = \*{$pkg . $name};
	delete $$pkg{$name};
	return $ref;

# open a filehandle with (a little) protection
# against "Too many open filehandles" error.
# If supported, new filehandles are tied to
# F::S::Tie::IPCFileHandle, which (potentially)
# logs and debugs activity on that handle.
sub _safeopen ($*$$;%) {
    my ($job, $fh, $mode, $expr, %options) = @_;
    my ($open2, $open3) = _parse_safeopen_mode($mode, $expr);

    my $result;
    if (!defined $fh || $options{tie_fh}) {
	if (!defined $fh) {
	    $fh = _gensym();
	if ($options{tie_fh} && $options{tie_fh} ne '1') {
	    $fh = $options{tie_fh};
	if ($USE_TIE_FH) {
	    my $gh = $fh;
	    tie *{$fh}, $TIE_FH_CLASS, parent => $fh; #, job => $job;
	    if (!$options{no_delegator}) {
		eval { bless $fh, 'Forks::Super::Tie::Delegator'; 1 }
		or carp "_safeopen: failed to bless *$fh as delegator ...\n";
	    $$fh->{parent} ||= $gh;

    my @io_layers = _get_iolayers_for_safeopen($job, $open2, %options);

    if ($open2 !~ /&/ && $] >= 5.007) {
	$open2 .= join'', @io_layers;
	@io_layers = ();
	if ($job->{debug}) {
	    debug('open mode for ',$open3||'file descriptor'," is $open2");

    for my $try (1 .. 10) {
	if ($try == 10) {
	    carp 'Forks::Super: ',
	        "Failed to open $mode $expr after 10 tries. Giving up.\n";
	    return 0;
	$result = _safeopen_open($fh, $open2, $open3);
	$_[1] = $fh;

	if ($result) {
		$job, [$fh,$expr,$mode], "$open2 ".($open3||''), @io_layers);

	    push @SAFEOPENED, $fh;

	    # dereferenced file handles are just symbol tables, and we
	    # can store arbitrary data in them
	    # -- there are a lot of ways we can make good use of this data.

	    my ($pkg,$file,$line) = caller;
	    $$fh->{opened} = Time::HiRes::time();
	    $$fh->{caller} = "$pkg;$file:$line";
	    $$fh->{is_regular} = 1;
	    $$fh->{is_socket} = 0;
	    $$fh->{is_pipe} = 0;
	    $$fh->{mode} = $mode;
	    $$fh->{expr} = $expr;
	    $$fh->{glob} = '' . *$fh;
	    $$fh->{job} = $job;
	    my $fileno = $$fh->{fileno} = CORE::fileno($_[1]);
	    $FILENO{$_[1]} = $fileno;
	    $__OPEN_FH{$fileno} = {%$job};
	    if ($mode =~ />/) {

	    if ($mode =~ /&/) {
		$$fh->{dup_glob} = '' . *$expr;
		$$fh->{dup} = $$expr;
		$$expr->{duped_by} .= ' ' . *$fh;
		$$fh->{is_regular} = $$expr->{is_regular};
		$$fh->{is_socket} = $$expr->{is_socket};
		$$fh->{is_pipe} = $$expr->{is_pipe};
	    return 1;
	} else {
	    _handle_safeopen_failure($try, $options{robust},
				     $DEBUG || $job->{debug},
				     "$open2 " . $expr||'');
    return $result;

sub _parse_safeopen_mode {
    my ($mode, $expr) = @_;
    my ($open2, $open3);
    if ($mode =~ /&/) {
	my $fileno = CORE::fileno($expr);
	if (!defined $fileno) {
	    carp "_safeopen: no fileno available for $expr!\n";
	} elsif ($fileno >= 0) {
	    return ($mode . $fileno, undef);
    return ($mode, $expr);

sub _get_iolayers_for_safeopen {
    my ($job, $open2, %options) = @_;
    return if $options{no_layers};

    my @layers = @{ $job->{fh_config}{layers} || [] };
    return $open2 =~ /</ ? reverse @layers : @layers;

sub _apply_safeopen_layers_for_older_perls {
    my ($job, $handle, $open, @layers) = @_;
    my ($fh, $expr, $mode) = @$handle;
    foreach my $layer (@layers) {
	local $! = 0;
	if ($] < 5.008 && $mode =~ /&/) {
	    my $binmode_result2 = eval { binmode $expr,$layer } or 0;
	    if ($job->{debug}) {
		debug("applied $layer to $expr, result=$binmode_result2");

	for my $redo (1..10) {
	    if (eval { binmode $fh, $layer }) {
		if ($job->{debug}) {
		    debug("applying I/O layer $layer to $open");
	    } elsif ($redo == 10) {
		carp 'Forks::Super::_safeopen: ',
	        	"Failed to apply I/O layer $layer ",
	        	"to IPC file $open: $!";
	    Forks::Super::Util::pause(0.01 * $redo);

sub _safeopen_open {
    my ($fh, $open2, $open3) = @_;
    my $result;
    if (defined $open3) {
	$result = CORE::open($fh, $open2, $open3);      ## no critic (BriefOpen)
	$_[0] = $fh;
    } else {
	$result = CORE::open($fh, $open2);              ## no critic (BriefOpen)
	$_[0] = $fh;
    return $result;

sub _handle_safeopen_failure {
    my ($try, $robust, $debug, $description) = @_;
    # called when _safeopen (_safeopen_open) returns false.

    if ($! =~ /too many open filehandles/i ||
	$! == $Forks::Super::SysInfo::TOO_MANY_FH_ERRNO) {

	carp "$! while opening $description. ",
		"[openfh=$__OPEN_FH/$__MAX_OPEN_FH] Retrying ...\n";
    } elsif ($robust && ($! =~ /no such file or directory/i ||
			 $! == $Forks::Super::SysInfo::FILE_NOT_FOUND_ERRNO)) {
	if ($debug) {
	    debug("$! while opening $description in $$. Retrying ...");
    } else {
	if ($try > 5) {
	    carp_once [$!], "$! while opening $description in $$ ",
		    "[openfh=$__OPEN_FH/$__MAX_OPEN_FH]. Retrying ...\n";
    Forks::Super::pause(0.002 * $try * $try * $try, $try < 3);


sub __set_fh_config_from_spec {
    my ($job, $config, $fh_spec) = @_;
    if ($fh_spec =~ /all/i) {
	$config->{in} = 1;
	$config->{out} = 1;
	$config->{err} = 1;
	$config->{all} = 1;
    } else {
	if ($fh_spec =~ /(?<!jo)in/i) {
	    $config->{in} = 1;
	if ($fh_spec =~ /out/i) {
	    $config->{out} = 1;
	if ($fh_spec =~ /err/i) {
	    $config->{err} = 1;
	if ($fh_spec =~ /join/i) {
	    $config->{join} = 1;
	    $config->{out} = 1;
	    $config->{err} = 1;

    if ($fh_spec =~ /block/i) {
	$config->{block} = 1;

    my @layers = $fh_spec =~ /(:[()\w]+)/g;
    if (@layers > 0) {
	$config->{layers} = \@layers;
	if ($job->{debug}) {
	    debug("io layers for job: @layers");

sub _preconfig_fh_parse_child_fh {
    my $job = shift;
    my $config = {};

    if (defined $job->{child_fh}) {
	my $fh_spec = $job->{child_fh} || '';
	if (ref $fh_spec eq 'ARRAY') {
	    $fh_spec = join q/;/, @$fh_spec;
	__set_fh_config_from_spec($job, $config, $fh_spec);

	if (($job->{style} ne 'cmd' && $job->{style} ne 'exec') || !&IS_WIN32) {

	    _adjust_fh_config_for_Win32_cmd($job, $fh_spec, $config);

	} elsif (!Forks::Super::Config::CONFIG('filehandles')) {

	    carp 'Forks::Super::Job::_preconfig_fh: ',
	        "Requested cmd/exec-style fork on MSWin32 with\n",
	        "socket based IPC. This is not going to end well.\n";

	    $config->{sockets} = 1;

    } elsif ($job->{child_suppress}) {
	# child_suppress => [in],[out],[err] is an undocumented feature
	# -- I might change the name, or I might not even like the
	# functionality it provides.
	my %suppress = map { $_ => 1 } split /,/, $job->{child_suppress};
	$job->{fh_config}{suppress} = \%suppress;

    if (&IS_WIN32 && !$ENV{WIN32_PIPE_OK} && $config->{pipes}) {
	$config->{sockets} = 1;
	$config->{pipes} = 0;
    return $config;

sub _adjust_fh_config_for_Win32_cmd {
    my ($job, $fh_spec, $config) = @_;

    # sockets,pipes not supported for cmd/exec style forks on MSWin32
    # we could support cmd-style with IPC::Open3-like framework?

    # sockets,pipes not supported for daemon processes, because that
    # would require leaving a common file descriptor open in both
    # processes.
    if ($job->{daemon}) {
	if ($fh_spec =~ /sock/i || $fh_spec =~ /pipe/i) {

	    # XXX - support socket IPC and daemon
	    if (Forks::Super::Config::CONFIG('filehandles')) {
		carp 'Forks::Super::Job::_preconfig_fh: ',
		'Socket/pipe based IPC not allowed ',
		'for daemon process.';
	    } else {
		croak 'Forks::Super::Job::_preconfig_fh: ',
		'Socket/pipe based IPC not allowed ',
		'for daemon process.';
    } elsif ($fh_spec =~ /sock/i) {
	$config->{sockets} = 1;
    } elsif ($fh_spec =~ /pipe/i) {
	$config->{pipes} = 1;

sub _preconfig_fh_parse_stdxxx {
    my ($job, $config) = @_;
    if (defined $job->{stdin}) {
	$config->{in} = 1;
	if (ref $job->{stdin} eq 'ARRAY') {
	    $config->{stdin} = join'', @{$job->{stdin}};
	} elsif (ref $job->{stdin} eq 'SCALAR') {
	    $config->{stdin} = ${$job->{stdin}};
	} else {
	    $config->{stdin} = $job->{stdin};

    if (defined $job->{stdout}) {
	if (ref $job->{stdout} ne 'SCALAR') {
	    carp 'Forks::Super::_preconfig_fh: ',
		    "'stdout' option must be a SCALAR ref\n";
	} else {
	    $config->{stdout} = $job->{stdout};
	    $config->{out} = 1;
	    $job->{'_callback_collect'} = \&Forks::Super::Job::Ipc::collect_output;

    if (defined $job->{stderr}) {
	if (ref $job->{stderr} ne 'SCALAR') {
	    carp 'Forks::Super::_preconfig_fh: ',
		    "'stderr' option must be a SCALAR ref\n";
	} else {
	    $config->{stderr} = $job->{stderr};
	    $config->{err} = 1;
	    $job->{'_callback_collect'} = \&Forks::Super::Job::Ipc::collect_output;

sub Forks::Super::Job::_preconfig_share {
    my $job = shift;
    if (defined $job->{share}) {
	if ($job->{style} eq 'cmd' || $job->{style} eq 'exec') {
	    carp 'Forks::Super::_preconfig_share: ',
	        'share  option incompatible with cmd or exec option';
	$job->{share_ipc} = Forks::Super::Job::Ipc::_choose_fh_filename(
		'.share', purpose => 'share ipc');
	$job->{_callback_share} = \&Forks::Super::Job::Ipc::retrieve_share;

sub Forks::Super::Job::_preconfig_fh {
    my $job = shift;

    # set  %$config{in,out,err,join,block,sockets,pipes}
    my $config = _preconfig_fh_parse_child_fh($job);

    # set  %$config{stdin,stdout,stderr}
    _preconfig_fh_parse_stdxxx($job, $config);

    if ($config->{pipes}) {
    } elsif (Forks::Super::Config::CONFIG('filehandles')
	     && !$config->{sockets}) {

	_preconfig_fh_files($job, $config);

    } else {
	$config->{sockets} ||= 7;
	_preconfig_fh_sockets($job, $config);

    if (0 < scalar keys %$config) {
	$job->{fh_config} = $config;

# read output from children into scalar reference variables in the parent
sub collect_output {
    my ($job,$pid) = @_;

    my $fh_config = $job->{fh_config};
    if (!defined $fh_config) {
    my $stdout = $fh_config->{stdout};
    if (defined $stdout) {

	_collect_output($job, $pid, 'f_out', 
			'Forks::Super::read_stdout', $stdout);

	if ($job->{debug}) {
	    debug("Job $pid loaded ", length($$stdout),
		  " bytes from stdout into $stdout");


    my $stderr = $fh_config->{stderr};
    if (defined $stderr) {

	_collect_output($job, $pid, 'f_err',
			'Forks::Super::read_stderr', $stderr);

	if ($job->{debug}) {
	    debug("Job $pid loaded ", length($$stderr),
		  " bytes from stderr into $stderr");


sub _collect_output {
    my ($job, $pid, $fileattr, $altmethod, $vessel) = @_;

    my $fh_config = $job->{fh_config};
    my $attr = $fh_config->{$fileattr};
    if ($attr && $attr ne '__socket__' && $attr ne '__pipe__') {
	local $/ = undef;
	if (_safeopen($job, my $fh, '<', $attr)) {
	    ($$vessel) = <$fh>;
	} else {
	    carp 'Forks::Super::Job::Ipc::collect_output(): ',
		    "Failed to retrieve $fileattr from child $pid: $!\n";
    } else {
	no strict 'refs';
	$$vessel = join'', $altmethod->($pid);

sub retrieve_share {
    my ($job, $pid) = @_;
    if (!defined $job->{share_ipc}) {
	# carp ...
    my $VAR1 = '';
    my $fh;
    for my $try (1..10) {
	last if open $fh, '<', $job->{share_ipc};      ## no critic (BriefOpen)
	carp 'open ',$job->{share_ipc}, " failed try $try ... $!\n";
    my $expr = do { local $/=undef; <$fh> };
    close $fh;
    if ($job->{untaint}) {
	($expr) = $expr =~ /(.*)/s;
    eval $expr;                   ## no critic (StringyEval,CheckingReturnValue)
    my @VAR1;

    if (ref $VAR1 eq 'ARRAY') {
	@VAR1 = @$VAR1;
    } else {
	carp "\$VAR1 is:  $expr, not an ARRAY ref!\n";

    if (@VAR1) {
	foreach my $ref (@{$job->{share}}) {
	    require Scalar::Util;
	    my $reftype = Scalar::Util::reftype($ref);
	    my $val = shift @VAR1;
	    if ($reftype eq 'SCALAR') {
		$$ref = $$val;
	    } elsif ($reftype eq 'ARRAY') {
		push @$ref, @$val;
	    } elsif ($reftype eq 'HASH') {
		foreach my $key (keys %$val) {
		    $ref->{$key} = $val->{$key};
	    } else {
		carp 'share element is not a reference!';
    } else {
	carp 'Forks::Super: no share values returned';

sub _preconfig_fh_files {
    my ($job, $config) = @_;
    if ($config->{in}) {
	$config->{f_in} = _choose_fh_filename('', purpose => 'STDIN', 
					      job => $job);
	debug("Using $config->{f_in} as shared file for child STDIN")
	    if $job->{debug} && $config->{f_in};

	if ($config->{stdin}) {
	    if (_safeopen($job, my $fh, '>', $config->{f_in})) {
		print $fh $config->{stdin};
	    } else {
		carp 'Forks::Super::Job::_preconfig_fh: ',
		    "scalar standard input not available in child: $!\n";
    if ($config->{out}) {
	$config->{f_out} = _choose_fh_filename('', purpose => 'STDOUT', 
					       job => $job);
	debug("Using $config->{f_out} as shared file for child STDOUT")
	    if $job->{debug} && $config->{f_out};
    if ($config->{err}) {
	$config->{f_err} = _choose_fh_filename('', purpose => 'STDERR', 
					       job => $job);
	debug("Using $config->{f_err} as shared file for child STDERR")
	    if $job->{debug} && $config->{f_err};

sub _preconfig_fh_sockets {
    my ($job,$config) = @_;
    if (!Forks::Super::Config::CONFIG('Socket')) {
	carp 'Forks::Super::Job::_preconfig_fh_sockets(): ',
	    'Socket unavailable. ',
	    "Will try to use regular filehandles for child ipc.\n";
	delete $config->{sockets};
    foreach my $channel (qw(in out err)) {
	next if not defined $config->{$channel};
	$config->{"f_$channel"} = '__socket__';
	if ($channel eq 'err' && defined($config->{out}) && $config->{join}) {
	    $config->{csock_err} = $config->{csock_out};
	    $config->{psock_err} = $config->{psock_out};
	} else {
	    ($config->{"csock_$channel"}, $config->{"psock_$channel"})
		= _create_socket_pair($job, $channel eq 'in' ? +1 : -1);

	    if ($job->{debug}) {
		debug('created socket pair/', $config->{"csock_$channel"}, ':',
		      CORE::fileno($config->{"csock_$channel"}), '/',
		      $config->{"psock_$channel"}, ':',

sub _preconfig_fh_pipes {
    my ($job,$config) = @_;
    if (!$Forks::Super::SysInfo::CONFIG{'pipe'}) {
	carp 'Forks::Super::Job::_preconfig_fh_pipes(): ',
	    'Pipes unavailable. ',
	    "Will try to use regular filehandles for child ipc.\n";
	delete $config->{pipes};

    if ($config->{in}) {
	$config->{f_in} = '__pipe__';
	($config->{p_in}, $config->{p_to_in}) = _create_pipe_pair($job);
    if ($config->{out}) {
	$config->{f_out} = '__pipe__';
	($config->{p_out},$config->{p_to_out}) = _create_pipe_pair($job);
    if ($config->{err} && !$config->{join}) {
	$config->{f_err} = '__pipe__';
	($config->{p_err},$config->{p_to_err}) = _create_pipe_pair($job);

    if ($job->{debug}) {
	debug("created pipe pairs for ", $job->{pid});

sub _create_socket_pair {
    my ($job, $dir) = @_;  # dir:  -1 child->parent, +1 parent->child, 0 bidir.

    if (!Forks::Super::Config::CONFIG('Socket')) {
	croak "Forks::Super::Job::_create_socket_pair(): no Socket\n";
    my ($s_child, $s_parent);
    local $! = undef;
    if (Forks::Super::Config::CONFIG('IO::Socket') && 0) {
	($s_child, $s_parent) = IO::Socket->socketpair(
	    Socket::AF_UNIX(), Socket::SOCK_STREAM(), Socket::PF_UNSPEC());
	if (!(defined($s_child) && defined($s_parent))) {
	    warn 'Forks::Super::_create_socket_pair: ',
	    	"IO::Socket->socketpair(AF_UNIX) failed. Trying AF_INET\n";
	    ($s_child, $s_parent) = IO::Socket->socketpair(
		Socket::AF_INET(), Socket::SOCK_STREAM(), Socket::PF_UNSPEC());
    } else {

	# socketpair not supported on MSWin32 5.6
	$s_child = _gensym();
	$s_parent = _gensym();

	my $z = socketpair($s_child, $s_parent, Socket::AF_UNIX(),
			   Socket::SOCK_STREAM(), Socket::PF_UNSPEC());
	if ($z == 0) {
	    warn 'Forks::Super::_create_socket_pair: ',
	    	"socketpair(AF_UNIX) failed. Trying AF_INET\n";
	    $z = socketpair($s_child, $s_parent, Socket::AF_INET(),
			    Socket::SOCK_STREAM(), Socket::PF_UNSPEC());
	    if ($z == 0) {
		undef $s_child;
		undef $s_parent;
    if (!(defined($s_child) && defined($s_parent))) {
	carp 'Forks::Super::Job::_create_socket_pair(): ',
		"socketpair failed $! $^E!\n";

    $$s_child->{fileno} = $FILENO{$s_child} = CORE::fileno($s_child);
    $$s_parent->{fileno} = $FILENO{$s_parent} = CORE::fileno($s_parent);

    $$s_child->{glob}       = '' . *$s_child;
    $$s_parent->{glob}      = '' . *$s_parent;
    $$s_child->{job}        = $$s_parent->{job}        = $job;

    $$s_child->{is_socket}  = $$s_parent->{is_socket}  = 1;
    $$s_child->{is_pipe}    = $$s_parent->{is_pipe}    = 0;
    $$s_child->{is_regular} = $$s_parent->{is_regular} = 0;
    $$s_child->{is_child}   = $$s_parent->{is_parent}  = 1;
    $$s_child->{is_parent}  = $$s_parent->{is_child}   = 0;
    $$s_child->{opened}     = $$s_parent->{opened}     = Time::HiRes::time();
    my ($pkg,$file,$line)   = caller(2);
    $$s_child->{caller}     = $$s_parent->{caller}     = "$pkg;$file:$line";

    if ($dir >= 0) {
	$$s_child->{is_write} = $$s_parent->{is_read} = 1;
    if ($dir <= 0) {
	$$s_child->{is_read} = $$s_parent->{is_write} = 1;

    # XXX - $__OPEN_FH += 2 ?

    return ($s_child,$s_parent);

sub ___fileno {
    my $fh = shift;
    return $FILENO{$fh};

sub _create_pipe_pair {
    my $job = shift;
    if (!defined($job)) {
	Carp::confess 'no job supplied to _create_pipe_pair';

    if (!$Forks::Super::SysInfo::CONFIG{'pipe'}) {
	croak "Forks::Super::Job::_create_pipe_pair(): no pipe\n";

    my ($p_read, $p_write) = (_gensym(), _gensym());
    local $! = undef;

    pipe $p_read, $p_write 
	or croak "Forks::Super::Job: create pipe failed $!\n";

    $$p_read->{fileno} = $FILENO{$p_read} = CORE::fileno($p_read);
    $$p_write->{fileno} = $FILENO{$p_write} = CORE::fileno($p_write);

    $$p_read->{is_pipe} = $$p_write->{is_pipe} = 1;
    $$p_read->{is_socket} = $$p_write->{is_socket} = 0;
    $$p_read->{is_regular} = $$p_write->{is_regular} = 0;
    $$p_read->{is_read} = $$p_write->{is_write} = 1;
    $$p_read->{is_write} = $$p_write->{is_read} = 1;
    $$p_read->{opened} = $$p_write->{opened} = Time::HiRes::time();
    $$p_read->{job} = $$p_write->{job} = $job;

    my ($pkg,$file,$line) = caller(2);
    $$p_read->{caller} = $$p_write->{caller} = "$pkg;$file:$line";

    # XXX - $__OPEN_FH += 2 ?

    return ($p_read, $p_write);

sub _choose_fh_filename {
    my ($suffix, @debug_info) = @_;
    my $basename = $ENV{FORKS_SUPER_IPC_BASENAME} || '.fh_';
    if (!Forks::Super::Config::CONFIG('filehandles')) {
    if (not defined $_IPC_DIR) {

    my $file = sprintf ('%s/%s%03d', $_IPC_DIR, $basename, $IPC_COUNT);
    if (defined $suffix) {
	$file .= $suffix;

    if (&IS_WIN32) {
	$file =~ s!/!\\!g;

    _register_ipc_file($file, [ @debug_info ]);

    if (!$IPC_DIR_DEDICATED && -f $file) {
	carp 'Forks::Super::Job::_choose_fh_filename: ',
		"IPC file $file already exists!\n";
	debug("$file already exists ...") if $DEBUG;
    return $file;

sub _register_ipc_file {
    my ($file, $info) = @_;
    push @IPC_FILES, $file;
    if (defined $info) {
	$IPC_FILES{$file} = $info;

# choose a writeable but discrete location for files to
# handle interprocess communication.
sub _identify_shared_fh_dir {
    return if defined $_IPC_DIR;
    return if Forks::Super::Config::CONFIG('filehandles') eq '0';

    # what are the good candidates ???
    # Any:       .
    # Windows:   C:/Temp C:/Windows/Temp %HOME%
    # Other:     /tmp $HOME /var/tmp
    my @search_dirs = ($ENV{'HOME'}, $ENV{'PWD'});
    if (&IS_WIN32) {
	push @search_dirs, 'C:/Temp', $ENV{'TEMP'}, 'C:/Windows/Temp',
	    'C:/Winnt/Temp', 'D:/Windows/Temp', 'D:/Winnt/Temp',
	    'E:/Windows/Temp', 'E:/Winnt/Temp', '.';
    } else {
	# XXX - safe to untaint cwd?
	my ($cwd) = Forks::Super::Util::abs_path('.') =~ /(.*)/;
	unshift @search_dirs, $cwd;
	push @search_dirs, '/tmp', '/var/tmp';

    foreach my $dir (@search_dirs) {
	next if !(defined($dir) && $dir =~ /\S/);
	debug("Considering $dir as shared filehandle dir ...") if $DEBUG;
	if (Forks::Super::Config::configif('filehandles')) {
	    if (set_ipc_dir($dir,0)) {
		debug("Selected $_IPC_DIR as shared filehandle dir ...")
		    if $DEBUG;
		return $_IPC_DIR;

# attempt to set $_IPC_DIR / $Forks::Super::IPC_DIR. Will fail if
# input is not a good directory name.
sub enable_cleanse_mode { return $cleanse_mode = 1; }
sub is_cleanse_mode { return $cleanse_mode; }

sub set_ipc_dir {
    my ($dir, $carp) = @_;

    if (defined($dir) && $dir eq 'undef') {
	# disable file IPC
	$Forks::Super::Config::CONFIG{'filehandles'} = 0;
	$_IPC_DIR = undef;
    return if !Forks::Super::Config::CONFIG('filehandles');

    $dir = Forks::Super::Util::abs_path($dir);
    return if !_check_for_good_ipc_basedir($dir);

    my $dedicated_dirname = _choose_dedicated_dirname($dir);

    if (!defined $dedicated_dirname) {
	carp 'Forks::Super::set_ipc_dir: ',
            "Failed to created new dedicated IPC directory under \"$dir\"\n"
		if $carp;

    if ($cleanse_mode==0 && ! _mkdir0777("$dir/$dedicated_dirname")) {
	carp 'Forks::Super::set_ipc_dir: ',
		'Could not created dedicated IPC directory ',
		"under \"$dir\": $!\n" if $carp;

    # success.
    $Forks::Super::FH_DIR = "$dir/$dedicated_dirname";   # deprecated

    $_IPC_DIR = "$dir/$dedicated_dirname";
    # $Forks::Super::IPC_DIR is tied to this variable

    debug("dedicated IPC directory: $_IPC_DIR") if $DEBUG;

    # create README
    if (!$Forks::Super::Job::Ipc::NO_README && !$cleanse_mode) {
	my $readme = "$_IPC_DIR/README.txt";
	my $localtime1 = time;
	my $localtime2 = scalar localtime;
	if (open my $readme_fh, '>', $readme) {  ## no critic (ThisIsA,BriefOpen)
	    print $readme_fh <<"____";
This directory was created by $^O process $$ at
$localtime1 $$
$0 @ARGV
for interprocess communication.

It should be/have been cleaned up when the process completes/completed.
If that didn't happen for some reason, it is safe to delete
this directory. You may also consider running the command
"perl -MForks::Super=cleanse,$_IPC_DIR" to clean up this
and any other IPC litter.

            close $readme_fh;
	    _register_ipc_file( $readme,
				[ purpose => 'README' ] );
	} else {
	    carp 'Forks::Super::set_ipc_dir: ',
	        "Cannot create annotation file $readme: $!\n"; 
    return 1;

sub _mkdir0777 {
    my $dir = shift;
    return mkdir($dir, 0777)
	&& -r $dir
	&& -w $dir
	&& -x $dir;

sub _check_for_good_ipc_basedir {
    my ($dir,$carp) = @_;
    return if !defined($dir) || $dir !~ /\S/;
    my $ok = $cleanse_mode;
    if ($dir eq 'undef') {
	# don't use IPC.
	$Forks::Super::Config::CONFIG{'filehandles'} = 0;
	$_IPC_DIR = undef;
    if (! -d $dir) {
	my $if_carp_msg;
	if (-e $dir) {
	    $if_carp_msg = 'Forks::Super::set_ipc_dir: ' .
		"\"$dir\" is not a directory\n";
	} elsif (!$cleanse_mode) {
	    if (_mkdir0777($dir)) {
		$if_carp_msg = 
		    'Forks::Super::set_ipc_dir: ' .
		    "Created IPC directory \"$dir\"\n";
		$ok = 1;
	    } else {
		$if_carp_msg =
		    'Forks::Super::set_ipc_dir: ' .
		    "IPC directory \"$dir\" does not exist " .
		    "and could not be created: $!\n";
	if ($carp && $if_carp_msg) {
	    carp $if_carp_msg;
	return $ok;
    if ((! -r $dir) || (! -w $dir) || (! -x $dir)) {
	if ($carp) {
	    carp 'Forks::Super::set_ipc_dir: ',
	    	"Insufficient permission on IPC directory \"$dir\"";
    return 1;

sub _choose_dedicated_dirname {
    my $dir = shift || '.';
    my $dedicated_dirname = ".fhfork$$";
    my $n = 0;
    while (-e "$dir/$dedicated_dirname") {
	$dedicated_dirname = ".fhfork$$-$n";
	if ($n > 10000) {
    return $dedicated_dirname;

sub _cleanup {

    no warnings 'once';
    return if ($Forks::Super::DONT_CLEANUP || 0) > 0;
    return if !defined $_IPC_DIR;

    if (&IS_WIN32) {
    } else {

sub __cleanup__ {
    my $SIG = shift;
    if ($DEBUG) {
	debug("trapping: SIG$SIG");
    _untrap_signals() if defined &_untrap_signals;
    if ($DEBUG) {
	print STDERR "$$ received $SIG -- cleaning up\n";
    } else {
    if (&IS_WIN32) {
    exit 1;

# maintenance routine to erase all directories that look like
# temporary IPC directories.
# can invoke with
#    $ perl -MForks::Super=cleanse
#    $ perl -MForks::Super=cleanse,<directory>
sub cleanse {

    $_CLEANUP = 1;
    my $dir = shift;
    if (!defined $dir) {
	$dir = $_IPC_DIR;
    $dir =~ s![\\/]\.fhfork[^\\/]*$!!;
    if (! -e $dir) {
	print "No Forks::Super ipc files found under directory \"$dir\"\n";
    print "Cleansing ipc directories under $dir\n";
    chdir $dir
	or croak "Forks::Super::Job::Ipc::cleanse: Can't move to $_IPC_DIR\n";
    opendir(D, '.');

    foreach my $ipc_dir (grep { -d $_ && /^\.fhfork/ } readdir (D)) {
    closedir D;

sub _cleanse_dir {
    my $dir = shift;
    my $dh;

    opendir $dh, $dir;
    my $errors = 0;
    while (my $f = readdir($dh)) {
	next if $f eq '.' || $f eq '..';
	if (-d "$dir/$f") {
	    $errors += _cleanse_dir("$dir/$f");
	} else {
	    unlink "$dir/$f" or $errors++;
    closedir $dh;
    if (!$errors) {
	rmdir $dir and print "Removed $dir\n";
    return $errors;

sub _cleanse_ipc_dir {
    my $ipc_dir = shift;

    if ($DEBUG) {
	print STDERR "cleanse $ipc_dir ?\n";

    # try not to remove a directory for a running process ...
    return if _ipc_dir_used_by_live_process($ipc_dir);

    my $errors = _cleanse_dir($ipc_dir);
    if ($errors > 0) {
	no Carp;

	# on MSWin32, errors often mean that an existing process
	# is hanging on to these files?
	if ($^O eq 'MSWin32') {
	    warn "Encountered $errors errors cleaning up $ipc_dir:\n$^E\n";
	} else {
	    warn "Encounted $errors errors cleaning up $ipc_dir\n";

sub _ipc_dir_used_by_live_process {
    my $ipc_dir = shift;
    my $fh;
    if (!(-f "$ipc_dir/README.txt" && open $fh, '<', "$ipc_dir/README.txt")) {
	return 0;
    scalar <$fh>; # header
    scalar <$fh>; # localtime 2
    my ($t, $pid) = split /\s+/, <$fh>;
    close $fh;

    if ($DEBUG) {
	print STDERR "pid=$pid, t=$t, age=",time-$t,"\n";

    if ($t < time - 86400) {
	# process started 24hrs ago
	return 0;
    if (! CORE::kill(0, $pid)) {
	# process can't be signalled
	return 0;
    if ($^O ne 'MSWin32' && -e "/proc/$pid" &&
	(-C "/proc/$pid") < (-C $ipc_dir)) {
	# /proc dir is younger than ipc_dir.
	# may be a new process with the same process id
	return 0;

    # how else can we find the age of a running process?
    # especially on Windows?

    warn "Process $pid appears to still be running. ",
	        "Will not erase ipc dir $ipc_dir\n";
    return 1;

sub _END_foreground_cleanup {
    return 1 if $_CLEANUP++;
    if ($INC{'Devel/'}) {
	no warnings 'once';
	$Devel::Trace::TRACE = 0;

    foreach my $job (@Forks::Super::ALL_JOBS) {
	next unless ref $job;
    foreach my $fh (values %Forks::Super::CHILD_STDIN,
		    values %Forks::Super::CHILD_STDOUT,
		    values %Forks::Super::CHILD_STDERR) {
	# _close($fh);
	delete $__OPEN_FH{fileno($fh) || -1};
	$__OPEN_FH -= close $fh;

    # daemonize if there is anything to clean up
    my @unused_files = grep { ! -e $_ } keys %IPC_FILES;
    foreach my $unused_file (@unused_files) {
	delete $IPC_FILES{$unused_file};

    if (0 == scalar keys %IPC_FILES) {
	if (!defined($IPC_DIR_DEDICATED)
	    || ! -d $_IPC_DIR || rmdir $_IPC_DIR) {
	    return 1;

    umask 0;

sub _END_background_cleanup1 {
    # rename process, if supported by the OS, to note that we are cleaning up
    # not everyone will like this "feature"
    $0 = "Forks::Super:cleanup:$0";
    sleep 3;

    # removing all the files we created during IPC
    # doesn't always go smoothly. We'll give a
    # 3/4-assed effort to remove the files but
    # nothing more heroic than that.

    my %deleted = ();
    foreach my $ipc_file (keys %IPC_FILES) {
	if (! -e $ipc_file) {
	    $deleted{$ipc_file} = delete $IPC_FILES{$ipc_file};
	} else {
	    local $! = undef;
	    if ($DEBUG) {
		print STDERR "Deleting $ipc_file ... ";
	    my $z = unlink $ipc_file;
	    if ($z && ! -e $ipc_file) {
		if ($DEBUG) {
		    print STDERR "Delete $ipc_file ok\n";
		$deleted{$ipc_file} = delete $IPC_FILES{$ipc_file};
	    } else {
		if ($DEBUG) {
		    print STDERR "Delete $ipc_file not ok: $!\n";
		warn 'Forks::Super::END_cleanup: ',
		    "error disposing of ipc file $ipc_file: $z/$!\n";
    return %deleted;

sub _END_background_cleanup2 {
    # best efforts to cleanup the IPC files
    my %G = @_;
    my $z = rmdir($_IPC_DIR) || 0;
    if (!$z) {
	unlink glob("$_IPC_DIR/*");
	sleep 5;
	$z = rmdir($_IPC_DIR) || 0;

    if (!$z
	&& -d $_IPC_DIR
	&& 0 < glob("$_IPC_DIR/.nfs*")) {

	# Observed these files on Linux running from NSF mounted filesystem
	# .nfsXXX files are usually temporary (~30s) but hard to kill
	for my $i (1..10) {
	    sleep 5;
	    last if glob("$_IPC_DIR/.nfs*") <= 0;
	$z = rmdir($_IPC_DIR) || 0;

    if (!$z && -d $_IPC_DIR) {

	warn "Forks::Super::END_cleanup: rmdir $_IPC_DIR failed. $!\n";

	opendir(my $_Z, $_IPC_DIR);
	my @g = grep { !/^\.nfs/ } readdir($_Z);
	closedir $_Z;

# if we have created temporary files for IPC, clean them up.
# clean them up even if the children are still alive -- these files
# are exclusively for IPC, and IPC isn't needed after the parent
# process is done.
sub END_cleanup {

    if ($$ != ($Forks::Super::MAIN_PID || $MAIN_PID)) {

    return if _END_foreground_cleanup();

    return if CORE::fork();
    exit 0 if CORE::fork();

    my %G = _END_background_cleanup1();

    return if !defined $IPC_DIR_DEDICATED;
    return if 0 < scalar keys %IPC_FILES;

    my $zz = rmdir($_IPC_DIR) || 0;
    return if $zz;

    sleep 2;
    exit 0 if CORE::fork();

    # long sleep here for maximum portability.
    sleep 10;

sub END_cleanup_MSWin32 {
    return if $$ != ($Forks::Super::MAIN_PID || $MAIN_PID);
    return if $_CLEANUP++;
    $0 = "Forks::Super:cleanup:$0";

    # Use brute force to close all open handles. Leave STDERR open for warns.
    # XXX - is this ok? what if perl script is communicating with a socket?
    use POSIX ();
    for (0,1,3..999) {


    my @G = grep { -e $_ } keys %IPC_FILES;
  FILE_TRY: for my $try (1 .. 3) {
        if (@G == 0) {
	    last FILE_TRY;
	foreach my $G (@G) {
	    local $! = undef;
	    if (!unlink $G) {
		undef $!;
		$G =~ s!/!\\!;
		my $c1 = system("CMD /C DEL /Q \"$G\" 2> NUL");
    } continue {
	sleep 1;
	@G = grep { -e $_ } keys %IPC_FILES;

    if (@G != 0) {
	# in Windows, remaining files might be "being used by another process".
	my $dir = $_IPC_DIR;
	$dir =~ s!\\!/!g;
	$dir =~ s!/[^/]+$!!;
	warn 'Forks::Super: failed to clean up ', scalar @G, " temp files.\n",
		"Run  $^X -MForks::Super=cleanse,$dir  ",
		"after this program has ended.\n";

    if (defined $IPC_DIR_DEDICATED && -d $_IPC_DIR) {
	local $! = undef;
	my $z = rmdir $_IPC_DIR;
	if (!$z) {
	    warn 'Forks::Super: failed to remove dedicated ',
	    	"temp file directory $_IPC_DIR: $!\n";

sub _config_fh_parent_stdin {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    if (defined $fh_config->{stdin}) {
	debug('Passing STDIN from parent to child in scalar variable')
	    if $job->{debug};

    if ($fh_config->{in}) {
	# intialize $fh_config->{child_stdin}

	if ($fh_config->{sockets}) {
	} elsif ($fh_config->{pipes}) {
	} elsif (defined $fh_config->{f_in}) {
	} else {
	    # hope we don't / can't get here.
	    Carp::cluck 'fh_config->{in} is specified for ', $job->toFullString(),
		    "but we did not configure it in _config_fh_parent_stdin.\n";
    if (defined $job->{child_stdin}) {
	my $fh = $job->{child_stdin};
	$$fh->{job} = $job;
	$$fh->{purpose} = 'parent write to child stdin';
	$$fh->{is_write} = 1;

sub _apply_layers {
    my ($handle, @layers) = @_;
    foreach my $layer (@layers) {
	for my $redo (1..2) {
	    local $!=0;
	    last if binmode $handle, $layer;
	    if ($redo == 2) {
		carp "Forks::Super: failed to apply PerlIO layer $layer ",
	            "to handle $handle";
	    Forks::Super::Util::pause(0.01 * $redo);

sub __config_fh_parent_stdin_sockets {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    shutdown $fh_config->{psock_in}, 0;
    ${$fh_config->{psock_in}}->{_SHUTDOWN} = 1;
    push @SAFEOPENED, [ shutdown => $fh_config->{psock_in}, 1 ];
    if ($USE_TIE_SH) {
	$fh_config->{s_in} = _gensym();
	tie *{$fh_config->{s_in}}, 'Forks::Super::Tie::IPCSocketHandle',
	    $fh_config->{psock_in}, $fh_config->{s_in};
    } else {
	$fh_config->{s_in} = $fh_config->{psock_in};
	#_apply_layers($fh_config->{s_in}, @{$fh_config->{layers}})
	#    if $fh_config->{layers};
        = $Forks::Super::CHILD_STDIN{$job->{real_pid}}
        = $Forks::Super::CHILD_STDIN{$job->{pid}}
        = $fh_config->{s_in};
    $fh_config->{f_in} = '__socket__';
    debug("Setting up socket to $job->{pid} stdin $fh_config->{s_in} ",
	  CORE::fileno($fh_config->{s_in})) if $job->{debug};

sub __config_fh_parent_stdin_pipes {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    if ($USE_TIE_PH) {
	my $ph = _gensym();
	tie *$ph, 'Forks::Super::Tie::IPCPipeHandle', $fh_config->{p_to_in}, $ph;
	    = $Forks::Super::CHILD_STDIN{$job->{real_pid}}
	    = $Forks::Super::CHILD_STDIN{$job->{pid}}
	    = $ph;
    } else {
	    = $Forks::Super::CHILD_STDIN{$job->{real_pid}}
	    = $Forks::Super::CHILD_STDIN{$job->{pid}}
	    = $fh_config->{p_to_in};
	#_apply_layers($fh_config->{p_to_in}, @{$fh_config->{layers}})
	#	if $fh_config->{layers};
    $fh_config->{f_in} = '__pipe__';
    debug("Setting up pipe to $job->{pid} stdin $fh_config->{p_to_in} ",
	  CORE::fileno($fh_config->{p_to_in})) if $job->{debug};

sub __config_fh_parent_stdin_file {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    my $fh;
    local $! = 0;
    if (_safeopen($job, $fh, '>', $fh_config->{f_in})) {

	debug("Opening $fh_config->{f_in} in parent as child STDIN")
	    if $job->{debug};
	  = $Forks::Super::CHILD_STDIN{$job->{real_pid}}
	  = $fh;
	$Forks::Super::CHILD_STDIN{$job->{pid}} = $fh;

#    debug("Setting up link to $job->{pid} stdin in $fh_config->{f_in}")
#	  if $job->{debug};
	debug('Opened child STDIN (',fileno($fh),') in parent') if $job->{debug};

    } else {
	warn 'Forks::Super::Job::config_fh_parent(): ',
		  'could not open filehandle to write child STDIN (to ',
		  $fh_config->{f_in}, "): $!\n";

sub _config_fh_parent_stdout {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    if ($fh_config->{out} && $fh_config->{sockets}) {
    } elsif ($fh_config->{out} && $fh_config->{pipes}) {
    } elsif ($fh_config->{out} and defined $fh_config->{f_out}) {
    if (defined $job->{child_stdout}) {
	my $fh = $job->{child_stdout};
	$$fh->{is_read} = 1;
	$$fh->{job} = $job;
	$$fh->{purpose} = 'parent read from child stdout';
    if ($fh_config->{block}) {
	${$job->{child_stdout}}->{emulate_blocking} = 1;
    if ($fh_config->{join}) {
	delete $fh_config->{err};
	      = $Forks::Super::CHILD_STDERR{$job->{real_pid}}
	      = $Forks::Super::CHILD_STDERR{$job->{pid}}
	      = $job->{child_stdout};
	$fh_config->{f_err} = $fh_config->{f_out};
	debug("Joining stderr to stdout for $job->{pid}") if $job->{debug};

sub __config_fh_parent_stdout_sockets {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    shutdown $fh_config->{psock_out}, 1;
    ${$fh_config->{psock_out}}->{_SHUTDOWN} = 2;
    push @SAFEOPENED, [ shutdown => $fh_config->{psock_out}, 0 ];
    if ($USE_TIE_SH) {
	$fh_config->{s_out} = _gensym();
	tie *{$fh_config->{s_out}}, 'Forks::Super::Tie::IPCSocketHandle',
	    $fh_config->{psock_out}, $fh_config->{s_out};
    } else {
	$fh_config->{s_out} = $fh_config->{psock_out};
	if ($fh_config->{layers}) {
	    _apply_layers($fh_config->{s_out}, @{$fh_config->{layers}});
    $job->{child_stdout} = $Forks::Super::CHILD_STDOUT{$job->{real_pid}}
        = $Forks::Super::CHILD_STDOUT{$job->{pid}} = $fh_config->{s_out};
    $fh_config->{f_out} = '__socket__';
    debug("Setting up socket to $job->{pid} stdout $fh_config->{s_out} ",
	  CORE::fileno($fh_config->{s_out})) if $job->{debug};

sub __config_fh_parent_stdout_pipes {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    if ($USE_TIE_PH) {
	my $ph = _gensym();
	tie *$ph, 'Forks::Super::Tie::IPCPipeHandle', $fh_config->{p_out}, $ph;
	    = $Forks::Super::CHILD_STDOUT{$job->{real_pid}}
	    = $Forks::Super::CHILD_STDOUT{$job->{pid}}
	    = $ph;
    } else {

 	    = $Forks::Super::CHILD_STDOUT{$job->{real_pid}}
	    = $Forks::Super::CHILD_STDOUT{$job->{pid}}
	    = $fh_config->{p_out};
	if ($fh_config->{layers}) {
	    _apply_layers($fh_config->{p_out}, @{$fh_config->{layers}});
    $fh_config->{f_out} = '__pipe__';
    debug("Setting up pipe to $job->{pid} stdout $fh_config->{p_out} ",
	  CORE::fileno($fh_config->{p_out})) if $job->{debug};

sub __config_fh_parent_stdout_file {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    # creation of $fh_config->{f_out} may be delayed.
    # don't panic if we can't open it right away.
    my $fh;
    debug('Opening ', $fh_config->{f_out}, ' in parent as child STDOUT')
	if $job->{debug};
    local $! = 0;

    if (_safeopen($job, $fh, '<', $fh_config->{f_out}, robust => 1)) {

	debug('Opened child STDOUT (',fileno($fh),') in parent') if $job->{debug};
	$job->{child_stdout} = $Forks::Super::CHILD_STDOUT{$job->{real_pid}}
		= $Forks::Super::CHILD_STDOUT{$job->{pid}} = $fh;

	debug("Setting up link to $job->{pid} stdout in $fh_config->{f_out}")
	    if $job->{debug};

    } else {
	my $_msg = sprintf "%d: %s Failed to open f_out=%s: %s\n",
		$$, Forks::Super::Util::Ctime(), $fh_config->{f_out}, $!;

	warn 'Forks::Super::Job::config_fh_parent(): ',
	  'could not open filehandle to read child STDOUT (from ',
	  $fh_config->{f_out}, "): $!\n";

sub _config_fh_parent_stderr {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    if ($fh_config->{err} && $fh_config->{sockets}) {
    } elsif ($fh_config->{err} && $fh_config->{pipes}) {
    } elsif ($fh_config->{err} and defined $fh_config->{f_err}) {
    if (defined $job->{child_stderr}) {
	my $fh = $job->{child_stderr};
	$$fh->{is_read} = 1;
	$$fh->{job} = $job;
	$$fh->{purpose} = 'parent read from child stderr';
    if ($fh_config->{block}) {
	${$job->{child_stderr}}->{emulate_blocking} = 1;

sub __config_fh_parent_stderr_sockets {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    shutdown $fh_config->{psock_err}, 1;
    ${$fh_config->{psock_err}}->{_SHUTDOWN} = 2;
    push @SAFEOPENED, [ shutdown => $fh_config->{psock_err}, 0 ];
    if ($USE_TIE_SH) {
	$fh_config->{s_err} = _gensym();
	tie *{$fh_config->{s_err}}, 'Forks::Super::Tie::IPCSocketHandle',
	    $fh_config->{psock_err}, $fh_config->{s_err};
    } else {
	$fh_config->{s_err} = $fh_config->{psock_err};
	if ($fh_config->{layers}) {
	    _apply_layers($fh_config->{s_err}, @{$fh_config->{layers}});

        = $Forks::Super::CHILD_STDERR{$job->{real_pid}}
        = $Forks::Super::CHILD_STDERR{$job->{pid}}
        = $fh_config->{s_err};
    $fh_config->{f_err} = '__socket__';
    debug("Setting up socket to $job->{pid} stderr $fh_config->{s_err} ",
	  CORE::fileno($fh_config->{s_err})) if $job->{debug};

sub __config_fh_parent_stderr_pipes {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    if ($USE_TIE_PH) {
	my $ph = _gensym();
	tie *$ph, 'Forks::Super::Tie::IPCPipeHandle', $fh_config->{p_err}, $ph;

	    = $Forks::Super::CHILD_STDERR{$job->{real_pid}}
	    = $Forks::Super::CHILD_STDERR{$job->{pid}}
	    = $ph;
    } else {
	    = $Forks::Super::CHILD_STDERR{$job->{real_pid}}
	    = $Forks::Super::CHILD_STDERR{$job->{pid}}
	    = $fh_config->{p_err};
	if ($fh_config->{layers}) {
	    _apply_layers($fh_config->{p_err}, @{$fh_config->{layers}});
    $fh_config->{f_err} = '__pipe__';
    debug("Setting up pipe to $job->{pid} stderr ",
	  CORE::fileno($fh_config->{p_err})) if $job->{debug};

sub __config_fh_parent_stderr_file {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    delete $fh_config->{join};
    my $fh;
    debug('Opening ', $fh_config->{f_err}, ' in parent as child STDERR')
	if $job->{debug};
    local $! = 0;
    if (_safeopen($job, $fh, '<', $fh_config->{f_err}, robust => 1)) {

	debug('Opened child STDERR (',fileno($fh),') in parent') if $job->{debug};
		= $Forks::Super::CHILD_STDERR{$job->{real_pid}}
		= $Forks::Super::CHILD_STDERR{$job->{pid}}
		= $fh;

	debug("Setting up link to $job->{pid} stderr in $fh_config->{f_err}")
	    if $job->{debug};

    } else {
	my $_msg = sprintf "%d: %s Failed to open f_err=%s: %s\n",
		$$, Forks::Super::Util::Ctime(), $fh_config->{f_err}, $!;
	warn 'Forks::Super::Job::config_fh_parent(): ',
	  'could not open filehandle to read child STDERR (from ',
	  $fh_config->{f_err}, "): $!\n";

# open filehandles to the STDIN, STDOUT, STDERR processes of the job
# to be used by the parent. Presumably the child process is opening
# the same files at about the same time.
sub Forks::Super::Job::_config_fh_parent {
    my $job = shift;
    return if not defined $job->{fh_config};

    # _trap_signals(); # XXX - is this necessary?
    my $fh_config = $job->{fh_config};

    # set up stdin first.
    if ($job->{fh_config}{sockets}) {

	# is it helpful or necessary for the parent to close the
	# "child" sockets? Yes, apparently, for MSWin32.

	if (!$USE_TIE_SH) {
	    foreach my $channel (qw(in out err)) {
		my $s = $job->{fh_config}{"csock_$channel"};
    if ($job->{fh_config}{pipes}) {
	foreach my $pipeattr (qw(p_in p_to_out p_to_err)) {
	    if (defined $job->{fh_config}{$pipeattr}) {
		_close( $job->{fh_config}{$pipeattr} );
		delete $job->{fh_config}{$pipeattr};


sub _config_fh_child_stdin {
    my $job = shift;
    local $! = undef;
    my $fh_config = $job->{fh_config};
    if ($fh_config->{suppress} && $fh_config->{suppress}{in}) {
	close STDIN;
    if (!$fh_config->{in}) {
	close STDIN;

    if (defined $fh_config->{stdin}) {
    } elsif ($fh_config->{sockets}) {
    } elsif ($fh_config->{pipes}) {
    } elsif ($fh_config->{f_in}) {
    } else {
	carp 'Forks::Super::Job::Ipc: failed to configure child STDIN: ',
		'fh_config = ', join(' ', %{$job->{fh_config}});
    ${*STDIN}->{is_read} = 1;
    ${*STDIN}->{job} = $job;
    ${*STDIN}->{purpose} = "child $$ STDIN from parent " . $job->{ppid};
    if (defined($fh_config->{block}) && $fh_config->{block}) {
	${*STDIN}->{emulate_blocking} = 1;

sub __config_fh_child_stdin_scalar {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    my $fh;
    if ($fh_config->{sockets} || $fh_config->{pipes}) {
	my $stdin = $fh_config->{stdin};
	if (_safeopen($job, $fh, '<', \$stdin)) {

	    push @{$job->{child_fh_close}}, $fh;
	    close STDIN if &IS_WIN32;
	    *STDIN = $fh;
	    ${*STDIN}->{dup} = $fh;

	} else {
	    carp 'Forks::Super::Job::Ipc::_config_fh_child_stdin: ',
		    "Error initializing scalar STDIN in child $$: $!\n";

    } elsif (!(_safeopen($job, $fh, '<', $fh_config->{f_in}, no_layers => 1))) {
	carp 'Forks::Super::Job::Ipc::_config_fh_child_stdin(): ',
		"Error initializing scalar STDIN in child $$: $!\n";
    } elsif (!(_safeopen($job, *STDIN, '<&', $fh))) {
	push @{$job->{child_fh_close}}, $fh;
	carp 'Forks::Super::Job::Ipc::_config_fh_child_stdin(): ',
		"Error initializing scalar STDIN in child $$: $!\n";
    } else {
	push @{$job->{child_fh_close}}, $fh;

sub __config_fh_child_stdin_sockets {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    close STDIN if &IS_WIN32;
    shutdown $fh_config->{csock_in}, 1;
    ${$fh_config->{csock_in}}->{_SHUTDOWN} = 2;
    push @SAFEOPENED, [ shutdown => $fh_config->{csock_in}, 0 ];
    if ($USE_TIE_SH && $job->{style} ne 'cmd' && $job->{style} ne 'exec') {
	my $fh = _gensym();
	tie *$fh, 'Forks::Super::Tie::IPCSocketHandle',
		$fh_config->{csock_in}, $fh;
	*STDIN = *$fh;
	${*STDIN}->{std_delegate} = $fh;
    } else {

	if (!(_safeopen($job, *STDIN, '<&', $fh_config->{csock_in}))) {
	    warn 'Forks::Super::Job::_config_fh_child_stdin(): ',
		    "could not attach child STDIN to input sockethandle: $!\n";
    debug('Opening socket ',*STDIN,'/',CORE::fileno(STDIN), ' in child STDIN')
	if $job->{debug};

sub __config_fh_child_stdin_pipes {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    push @{$job->{child_fh_close}}, $fh_config->{p_in};
    close STDIN;
    if ($USE_TIE_PH && $job->{style} ne 'cmd' && $job->{style} ne 'exec') {
	my $ph = _gensym();
	tie *$ph, 'Forks::Super::Tie::IPCPipeHandle', $fh_config->{p_in}, $ph;
	*STDIN = *$ph;
	${*STDIN}->{std_delegate} = $ph;
    } elsif (!(_safeopen($job, *STDIN, '<&', $fh_config->{p_in}))) {
	warn 'Forks::Super::Job::_config_fh_child_stdin(): ',
		"could not attach child STDIN to input pipe: $!\n";
    } else {
	push @{$job->{child_fh_close}}, *STDIN;
    debug('Opening pipe ',*STDIN,'/',CORE::fileno(STDIN), ' in child STDIN')
	if $job->{debug};

sub __config_fh_child_stdin_file {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    close STDIN if &IS_WIN32;

    my $fh;
    if (_safeopen($job, $fh, '<', $fh_config->{f_in})) {
	push @{$job->{child_fh_close}}, $fh;

	if ($fh_config->{block}) {
	    $$fh->{emulate_blocking} = 1;
	$$fh->{purpose} = 'child read stdin from parent';

	if ($job->{style} eq 'cmd' || $job->{style} eq 'exec') {
	    _safeopen($job, *STDIN, '<&', $fh, robust => 1)
		or warn 'Forks::Super::Job::config_fh_child(): ',
	    	        'could not attach child STDIN ',
		        "to input filehandle: $!\n";
	} else {
	    # require Forks::Super::Tie::IPCDupSTDIN;
	    tie *STDIN, 'Forks::Super::Tie::IPCDupSTDIN', 
	        GLOB => $fh, 
	        JOB => &Forks::Super::Job::this,
	        TIED => tied(*$fh);
	if ($job->{debug}) {
	    debug("reopened STDIN in child");
    } else {
	warn 'Forks::Super::Job::config_fh_child(): ',
		"could not open filehandle to provide child STDIN: $!\n";

sub _config_fh_child_stdout {
    my $job = shift;
    local $! = undef;
    my $fh_config = $job->{fh_config};
    if ($fh_config->{suppress} && $fh_config->{suppress}{out}) {
	close STDOUT;
    return if ! $fh_config->{out};

    if ($fh_config->{sockets}) {
    } elsif ($fh_config->{pipes}) {
    } elsif ($fh_config->{f_out}) {
    } else {
	carp 'Forks::Super::Job::Ipc: failed to configure child STDOUT: ',
		'fh_config = ', join(' ', %{$job->{fh_config}});
    ${*STDOUT}->{is_write} = 1;

sub __config_fh_child_stdout_sockets {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    close STDOUT;
    shutdown $fh_config->{csock_out}, 0;
    ${$fh_config->{csock_out}}->{_SHUTDOWN} = 1;
    push @SAFEOPENED, [ shutdown => $fh_config->{csock_out}, 1 ];
    if ($USE_TIE_SH && $job->{style} ne 'cmd' && $job->{style} ne 'exec') {
	my $fh = _gensym();
	tie *$fh, 'Forks::Super::Tie::IPCSocketHandle',
		$fh_config->{csock_out}, $fh;
	*STDOUT = *$fh;
	${*STDOUT}->{std_delegate} = $fh;
    } else {
	_safeopen($job, *STDOUT, '>&', $fh_config->{csock_out})
	    or warn 'Forks::Super::Job::_config_fh_child_stdout(): ',
	        "could not attach child STDOUT to output sockethandle: $!\n";

    debug('Opening ',*STDOUT,'/',CORE::fileno(STDOUT),' in child STDOUT')
	if $job->{debug};

    if ($fh_config->{join}) {
	delete $fh_config->{err};
	close STDERR;
	_safeopen($job, *STDERR, '>&', $fh_config->{csock_out})
	    or warn 'Forks::Super::Job::_config_fh_child_stdout(): ',
		    "could not join child STDERR to STDOUT sockethandle: $!\n";

	debug('Joining ',*STDERR,'/',CORE::fileno(STDERR),
	      ' STDERR to child STDOUT') if $job->{debug};

sub __config_fh_child_stdout_pipes {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    close STDOUT;
    if ($USE_TIE_PH) {
	my $fh = _gensym();
	tie *$fh, 'Forks::Super::Tie::IPCPipeHandle', $fh_config->{p_to_out}, $fh;
	*STDOUT = *$fh;
	${*STDOUT}->{std_delegate} = $fh;
    } else {
	_safeopen($job, *STDOUT, '>&', $fh_config->{p_to_out})
	    or warn 'Forks::Super::Job::_config_fh_child_stdout(): ',
		    "could not attach child STDOUT to output pipe: $!\n";
    if ($job->{debug}) {
	debug('Opening ',*STDOUT,'/',CORE::fileno(STDOUT),' in child STDOUT');
    push @{$job->{child_fh_close}}, $fh_config->{p_to_out}, *STDOUT;

    if ($fh_config->{join}) {
	delete $fh_config->{err};
	close STDERR;
	_safeopen($job, *STDERR, '>&', $fh_config->{p_to_out})
	    or warn 'Forks::Super::Job::_config_fh_child_stdout(): ',
		    "could not join child STDERR to STDOUT sockethandle: $!\n";
	push @{$job->{child_fh_close}}, *STDERR;

sub __config_fh_child_stdout_file {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    my $fh;
    debug("Opening up $fh_config->{f_out} for output in the child   $$")
	if $job->{debug};
    if (_safeopen($job, $fh,'>',$fh_config->{f_out}, no_layers => 1)) {
	push @{$job->{child_fh_close}}, $fh;
	close STDOUT if &IS_WIN32;

	# can we pass undef fh to _safeopen and reassign to *STDOUT?
	# t/25 says no. Probably because we have already added a ton of
	# stuff to the $fh namespace?
	if (_safeopen($job, *STDOUT, '>&', $fh)) {
	    if ($fh_config->{join}) {
		delete $fh_config->{err};
		close STDERR if &IS_WIN32;
		_safeopen($job, *STDERR, '>&', $fh)
		    or warn 'Forks::Super::Job::config_fh_child(): ',
		        "could not attach STDERR to child output filehandle: ",
	} else {
	    warn 'Forks::Super::Job::config_fh_child(): ',
	        "could not attach STDOUT to child output filehandle: $!\n";
    } else {
	warn 'Forks::Super::Job::config_fh_child(): ',
	    "could not open filehandle to provide child STDOUT: $!\n";

sub _config_fh_child_stderr {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    if ($fh_config->{suppress} && $fh_config->{suppress}{err}) {
	close STDERR;
    return if ! $fh_config->{err};

    if ($fh_config->{sockets}) {
    } elsif ($fh_config->{pipes}) {
    } elsif ($fh_config->{f_err}) {
    } else {
	carp 'Forks::Super::Job::Ipc: failed to configure child STDERR: ',
	    'fh_config = ', join(' ', %{$job->{fh_config}});
    ${*STDERR}->{is_write} = 1;

sub __config_fh_child_stderr_sockets {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    close STDERR;
    shutdown $fh_config->{csock_err}, 0;
    ${$fh_config->{csock_err}}->{_SHUTDOWN} = 1;
    push @SAFEOPENED, [ shutdown => $fh_config->{csock_err}, 1 ];
    if ($USE_TIE_SH && $job->{style} ne 'cmd' && $job->{style} ne 'exec') {
	my $fh = _gensym();

	tie *$fh, 'Forks::Super::Tie::IPCSocketHandle',
	    $fh_config->{csock_err}, $fh;
	*STDERR = *$fh;
	${*STDERR}->{std_delegate} = $fh;
    } else {
	_safeopen($job, *STDERR, '>&', $fh_config->{csock_err})
	    or warn 'Forks::Super::Job::_config_fh_child_stderr(): ',
	        "could not attach STDERR to child error sockethandle: $!\n";
    if ($job->{debug}) {
	debug('Opening ',*STDERR,'/',CORE::fileno(STDERR),' in child STDERR');

sub __config_fh_child_stderr_pipes {
    my $job = shift;
    my $fh_config = $job->{fh_config};

    push @{$job->{child_fh_close}}, $fh_config->{p_to_err};
    close STDERR;
    if ($USE_TIE_PH && $job->{style} ne 'cmd' && $job->{style} ne 'exec') {
	my $fh = _gensym();
	tie *$fh, 'Forks::Super::Tie::IPCPipeHandle', 
		$fh_config->{p_to_err}, $fh;
	*STDERR = *$fh;
	${*STDERR}->{std_delegate} = $fh;
    } elsif (_safeopen($job, *STDERR, '>&', $fh_config->{p_to_err})) {
	debug('Opening ',*STDERR,'/',CORE::fileno(STDERR),
	      ' in child STDERR') if $job->{debug};
	push @{$job->{child_fh_close}}, *STDERR;
    } else {
	warn 'Forks::Super::Job::_config_fh_child_stderr(): ',
	    "could not attach STDERR to child error pipe: $!\n";

sub __config_fh_child_stderr_file {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    my $fh;
    debug("Opening $fh_config->{f_err} as child STDERR")
	if $job->{debug};
    if (_safeopen($job, $fh, '>', $fh_config->{f_err}, no_layers => 1)) {
	push @{$job->{child_fh_close}}, $fh;
	close STDERR if &IS_WIN32;
	_safeopen($job, *STDERR, '>&', $fh)
	    or warn 'Forks::Super::Job::_config_fh_child_stderr(): ',
		    "could not attach STDERR to child error filehandle: $!\n";
    } else {
	warn 'Forks::Super::Job::_config_fh_child_stderr(): ',
		"could not open filehandle to provide child STDERR: $!\n";


# open handles to the files that the parent process will
# have access to, and assign them to the local STDIN, STDOUT,
# and STDERR filehandles.
sub Forks::Super::Job::_config_fh_child {
    my $job = shift;
    return if not defined $job->{fh_config};

    # "a tie in the parent should not be allowed to cause problems"
    # according to IPC::Open3
    untie *STDIN;
    untie *STDOUT;
    untie *STDERR;

    # track handles to close when the child exits
    $job->{child_fh_close} = [];

    if ($job->{style} eq 'cmd' || $job->{style} eq 'exec') {
	if (&IS_WIN32 && Forks::Super::Config::CONFIG('filehandles')) {
	    return _config_cmd_fh_child($job);


    if (!$USE_TIE_SH) {
	if ($job->{fh_config} && $job->{fh_config}{sockets}) {
	    foreach my $channel (qw(in out err)) {
		my $s = $job->{fh_config}{"psock_$channel"};
		if (defined $s) {

    if ($job->{fh_config} && $job->{fh_config}{pipes}) {
	foreach my $pipeattr (qw(p_to_in p_out p_err)) {
	    if (defined $job->{fh_config}{$pipeattr}) {
		_close( $job->{fh_config}{$pipeattr} );

sub _collapse_command {
    my @cmd = @_;
    if (@cmd <= 1) {
	return @cmd;
    my @new_cmd = ();
    foreach my $cmd (@cmd) {
	if ($cmd !~ /[\s\'\"\[\]\;\(\)\<\>\t\|\?\&]/x) {
	    push @new_cmd, $cmd;
	} elsif ($cmd !~ /\"/) {
	    push @new_cmd, "\"$cmd\"";
	} elsif ($cmd !~ /\'/ && !&IS_WIN32) {
	    push @new_cmd, "'$cmd'";
	} else {
	    my $cmd2 = $cmd;
	    $cmd2 =~ s/([\s\'\"\\\[\]\;\(\)\<\>\t\|\?\&])/\\$1/gx;
	    push @new_cmd, "\"$cmd2\"";
    @cmd = ();
    push @cmd, (join ' ', @new_cmd);
    return @cmd;

# MSWin32 has trouble using the open '>&' and open '<&' syntax.
sub _config_cmd_fh_child {
    my $job = shift;
    my $fh_config = $job->{fh_config};
    my $cmd_or_exec = $job->{exec} ? 'exec' : 'cmd';
    my @cmd = @{$job->{$cmd_or_exec}};
    if (@cmd > 1) {
	@cmd = _collapse_command(@cmd);

    # XXX - not idiot proof. FH dir could have a metacharacter.
    if ($fh_config->{out} && $fh_config->{f_out}) {
	$cmd[0] .= " >\"$fh_config->{f_out}\"";
	if ($fh_config->{join}) {
	    $cmd[0] .= ' 2>&1';
    if ($fh_config->{err} && $fh_config->{f_err} && !$fh_config->{join}) {
	$cmd[0] .= " 2>\"$fh_config->{f_err}\"";
    if (!$fh_config->{in}) {
	close STDIN;
    } elsif ($fh_config->{f_in}) {

	# standard input must be specified before the first pipe char,
	# if any (How do you distinguish pipes that are
	# for shell piping, and pipes that are part of some command
	# or command line argument? The shell can do it, obviously,
	# but there is probably lots and lots of code to do it right.
	# And probably regex != doing it right).
	# e.g., want to be able to handle:
	#    $^X -F/\\\|/ -alne '$_=$F[3]-$F[1]-$F[2]' | ./another_program
	# and have input inserted after the SECOND |
	# To solve this we need to parse the command as well as the
	# shell does ...

	$cmd[0] = _insert_input_redir_to_cmdline_crudely(
			    $cmd[0], $fh_config->{f_in});

	# external command must not launch until the input file has been created
	my $try;
	for ($try = 1; $try <= 10; $try++) {
	    if (-r $fh_config->{f_in}) {
		$try = 0;
	    Forks::Super::pause(0.2 * $try);
	if ($try >= 10) {
	    warn 'Forks::Super::Job::config_cmd_fh_child(): ',
	    	"child was not able to detect STDIN file $fh_config->{f_in}. ",
	    	"Child may not have any input to read.\n";
    debug("config_cmd_fh_config(): child cmd is   $cmd[0]  ")
	if $job->{debug};

    $job->{$cmd_or_exec} = [ @cmd ];

sub _insert_input_redir_to_cmdline_crudely {
    my ($cmd, $input) = @_;

    # a crude parser that looks for the first unescaped
    # pipe char that is not inside single or double quotes,
    # or inside a () [] {} expression and inserts "< $input"
    # before that pipe char, or at the end of the line

    # XXX good enough to pass t/42e but I wonder what edge cases it misses.

    my @chars = split //, $cmd;
    my $result = '';
    my $insert = 0;
    my @group = ('');

    my %opener = qw! ) (  ] [  } { !;
    my %closer = reverse %opener;

    while (@chars) {
	my $char = shift @chars;
	$result .= $char;

	if ($char eq '\\') {
	    $result .= shift @chars;
	} elsif ($char eq '"') {
	    if ($group[-1] eq '"') {
		pop @group;
	    } elsif ($group[-1] ne "'") {
		push @group, '"';
	} elsif ($char eq "'") {
	    if ($group[-1] eq "'") {
		pop @group;
	    } elsif ($group[-1] ne '"') {
		push @group, "'";
	} elsif (exists $closer{$char}) {
	    push @group, $char;
	} elsif (exists $opener{$char} && $group[-1] eq $opener{$char}) {
	    pop @group;
	} elsif ($char eq '|' && @group <= 1) {
	    chop $result;
	    $result .= ' <"' . $input . '" | ';
	    $result .= join'', @chars;
	    @chars = ();
	    $insert = 1;
    if (!$insert) {
	$result .= ' <"' . $input . '"';
    return $result;

sub _close {
    my $handle = shift;
    return 0 if !defined $handle;
    return 0 if $$handle->{closed};
    if (!defined $$handle->{opened}) {
	# this method should only be used with I/O handles
	# opened by Forks::Super
	if (1 || $DEBUG) {
	    Carp::cluck 'Forks::Super::Job::Ipc::_close ',
	    "called on unrecognized filehandle $handle\n";

    if (is_socket($handle)) {
	return _close_socket($handle,2);

    my $z = 0;
    my $is_tied = !!tied *$handle;

    delete $__OPEN_FH{fileno($handle)};
    if ($is_tied) {
	#my $th = tied *$handle;
	#my $z = $th->CLOSE;
	$z = (tied *$handle)->CLOSE;
	untie *$handle;
    } else {
	$z ||= close $handle;
    if ($z) {
	if ($DEBUG) {
	    if (defined($$handle->{glob})) {
		debug("$$ closing[2] IPC handle ",$$handle->{glob});
	    } else {
		debug("$$ closing[1] IPC handle ",$handle);
    return $z;

sub _update_handle_for_close {
    my $handle = shift;
    $$handle->{closed} ||= Time::HiRes::time();
    $$handle->{elapsed} ||= $$handle->{closed} - $$handle->{opened};

# close down one-half of a socket. If the other half is already closed,
# then call close on the socket.
sub _close_socket {
    my ($handle, $is_write) = @_;
    return 0 if !defined $handle;
    return 0 if $$handle->{closed};

    $$handle->{shutdown} ||= 0;
    return 0 if $$handle->{shutdown} >= 3;

    $is_write++;    #  0 => 1, 1 => 2, 2 => 3
    if (0 == ($$handle->{shutdown} & $is_write)) {
	my $z = $$handle->{shutdown} |= $is_write;
	if ($$handle->{shutdown} >= 3) {


	    my $sh = $handle;
	    my $th = tied *$handle;
	    if ($th && $th->isa('Forks::Super::Tie::IPCSocketHandle')) {
		$sh = $th->{SOCKET};
		$z = close $sh;

		# XXX - is untie necessary? see comment in _close()
		# this untie doesn't generate warnings, though ...
		untie *$handle;
		delete $__OPEN_FH{fileno($handle)};
		$z += close $handle;
	    } else {
		$z = close $handle;
		if ($th) {
		    # XXX - is untie necessary? see comment in _close()
		    # no warnings from this untie, though ...
		    untie *$handle;
		    delete $__OPEN_FH{fileno($handle)};
		    $z += close $handle;
	    if ($DEBUG) {
		debug("$$ Closing IPC socket $$handle->{glob}");
	return $z;

sub _close_fh_stdin {
    my $job = shift;
    if (defined($job->{child_stdin}) && !defined($job->{child_stdin_closed})) {
	if (is_socket($job->{child_stdin})) {
	    if (_close_socket($job->{child_stdin}, 1)) {
		$job->{child_stdin_closed} = 1;
		debug("closed child stdin for $job->{pid}") if $job->{debug};
	} else {
	    if (_close($job->{child_stdin})) {
		$job->{child_stdin_closed} = 1;
		debug("closed child stdin for $job->{pid}") if $job->{debug};
    # delete $Forks::Super::CHILD_STDIN{...} for this job? No.

sub _close_fh_stdout {
    my $job = shift;
    if (defined($job->{child_stdout}) && 
	!defined($job->{child_stdout_closed})) {
	if (is_socket($job->{child_stdout})) {
	    if (_close_socket($job->{child_stdout}, 0)) {
		$job->{child_stdout_closed} = 1;
		debug("closed child stdout for $job->{pid}") if $job->{debug};
	} elsif (_close($job->{child_stdout})) {
	    $job->{child_stdout_closed} = 1;
	    debug("closed child stdout for $job->{pid}") if $job->{debug};
	if ($job->{fh_config}{join}) {
	    $job->{child_stderr_closed} = $job->{child_stdout_closed};
	    debug("closed joined child stderr for $job->{pid}")
		if $job->{debug};
    # delete $Forks::Super::CHILD_STDOUT{...} ? No.

sub _close_fh_stderr {
    my $job = shift;
    if (defined($job->{child_stderr}) && 
	!defined($job->{child_stderr_closed})) {

	if (is_socket($job->{child_stderr})) {
	    if (_close_socket($job->{child_stderr}, 0)) {
		$job->{child_stderr_closed} = 1;
		debug("closed child stderr for $job->{pid}") if $job->{debug};
	} elsif (_close($job->{child_stderr})) {
	    $job->{child_stderr_closed} = 1;
	    debug("closed child stderr for $job->{pid}") if $job->{debug};
    # delete $Forks::Super::CHILD_STDERR{...}? No.

sub close_fh {
    my ($job,@modes) = @_;
    my $modes;
	local $" = ' ';
	$modes = "@modes" || 'all';
	$modes =~ s/all/stdin stdout stderr/i;
    if ($job->{debug}) {
	debug("closing [$modes] on $job");

    if ($modes =~ /stdin/i)  { _close_fh_stdin($job);  }
    if ($modes =~ /stdout/i) { _close_fh_stdout($job); }
    if ($modes =~ /stderr/i) { _close_fh_stderr($job); }

sub Forks::Super::Job::write_stdin {
    my ($job, @msg) = @_;
    my $fh = $job->{child_stdin};
    if (defined $fh) {
	if ($job->{child_stdin_closed}) {
	    carp 'Forks::Super::Job::write_stdin: ',
		    "write on closed stdin handle for job $job->{pid}\n";
	} else {
	    local $! = 0;
	    my $z = print {$fh} @msg;

	    # an MSWin32 hack. Child sockets in t/43d and t/44d choke
	    # (XXX - Why?) without a small pause after each write.
	    # See also &Forks::Super::Tie::IPCSocketHandle::trivial_pause
	    if (&IS_WIN32 && is_socket($fh)) {
	    if ($!) {
		carp 'Forks::Super::Job::write_stdin: ',
			"warning on write to job $job->{pid} stdin: $!\n";
	    return $z;
    } else {
	carp 'Forks::Super::Job::write_stdin: ',
		"stdin handle for job $job->{pid} was not configured\n";

sub _read_socket {
    my ($sh, $job, $wantarray, %options) = @_;

    return if !__sanitize_read_inputs($sh, $job, $wantarray, %options);

    my $zz = eval { $sh->opened };
    if ($@) {
	carp 'Forks::Super::_read_socket: read on unopened, unopenable ',
	    "socket $sh, ref=",ref($sh),", error=$@\n";
    } elsif (!$zz) {
	carp "Forks::Super::_read_socket: read on unopened socket $sh ",
	    $job->toString(), "\n";

    # is socket is blocking, then we need to test whether
    # there is input to be read before we read on the socket
    my ($expire, $blocking_desired) = __extract_read_options($sh, \%options);

    while ($sh->blocking() || &IS_WIN32 || $blocking_desired) {
	my $fileno = fileno($sh);
	if (not defined $fileno) {
	    $fileno = Forks::Super::Job::Ipc::fileno($sh);
	    Carp::cluck "Cannot determine FILENO for socket handle $sh!";

	my ($rin,$rout);
	my $timeout = $Forks::Super::SOCKET_READ_TIMEOUT || 1.0;
	($timeout, $blocking_desired)
	    = __get_select_timeout($timeout, $expire, $blocking_desired);

	$rin = '';
	vec($rin, $fileno, 1) = 1;

	# perldoc select: warns against mixing select4
	# (unbuffered input) with readline (buffered input).
	# Do I have to do my own buffering? That would be weak.
	# Or are sockets already unbuffered?

	local $! = undef;
	my ($nfound,$timeleft) = select $rout=$rin, undef, undef, $timeout;

	if ($nfound) {
	    if ($rin ne $rout && $DEBUG) {
		debug("No input found on $sh/$fileno ",
		      "[shouldn't reach this block]");
	    if ($nfound == -1) {
		next if $!{EINTR}; # interrupted system call -- this is usually ok.
		warn "Forks::Super:_read_socket: Error in select4(): $! $^E.\n";
	if ($DEBUG) {
	    debug("no input found on $sh/$fileno");
	return if ! $blocking_desired;

    # XXX - see _read_pipe how we used sysread to build a
    #       readline return value from raw input ...
    return readline($sh);

sub __get_select_timeout {
    my ($timeout, $expire, $blocking_desired) = @_;
    if ($expire && Time::HiRes::time() + $timeout > $expire) {
	$timeout = $expire - Time::HiRes::time();
	if ($timeout < 0) {
	    $timeout = 0.0;
	    $blocking_desired = 0;
    return ($timeout, $blocking_desired);

sub __get_option {
    my ($option, $default, %options) = @_;
    if (defined $options{$option}) {
	return $options{$option};
    } else {
	return $default;

sub _read_pipe {
    my ($sh, $job, $wantarray, %options) = @_;

    if (!defined $sh) {
	if (!defined($options{'warn'}) || $options{'warn'}) {
	    carp 'Forks::Super::_read_pipe: ',
	        'read on undefined handle for ',$job->toString(),"\n";

    if (defined $$sh->{std_delegate}) {
	$sh = $$sh->{std_delegate};

    my $blocking_desired = __get_option(
	'block', $$sh->{emulate_blocking} || 0, %options);

    # pipes are blocking by default.
    if ($blocking_desired) {
	return $wantarray ? readline($sh) : scalar readline($sh);

    my $fileno = fileno($sh);
    if (! defined $fileno) {
	$fileno = Forks::Super::Job::Ipc::fileno($sh);
	Carp::cluck "Cannot determine FILENO for pipe $sh!";

    my ($rin,$rout);
    $rin = '';
    vec($rin, $fileno, 1) = 1;

  SELECT4: {
        my $timeout = __get_option(
	    'timeout', $Forks::Super::SOCKET_READ_TIMEOUT || 1.0, %options);
	local $! = undef;
	my ($nfound, $timeleft) = select $rout=$rin, undef, undef, $timeout;

	if ($nfound == 0) {
	    if ($DEBUG) {
		debug("no input found on $sh/$fileno");
	if ($nfound < 0) {
	    redo SELECT4  if $!{EINTR}; 
	    warn "Forks::Super::_read_pipe: error in select4(): $! $^E\n";
	    return; # return ''?

	if ($wantarray) {
	    return _emulate_readline_array($sh, $nfound, $rin);
	} else {
	    return _emulate_readline_scalar($sh, $nfound, $rin);

# emulate list context readline from a handle with sysread
sub _emulate_readline_array {
    my ($handle, $nfound, $rin) = @_;
    my ($timeleft, $rout);

    my $input = '';
    while ($nfound) {
	my $buffer = '';
	last if 0 == sysread $handle, $buffer, 1;
	$input .= $buffer;
	($nfound,$timeleft) = select $rout=$rin, undef, undef, 0.0;

    my @return = ();
    my $rs = defined($/) && length($/) ? $/ : chr(0xF0F0F); #"\x{F0F0}";
    while ($input =~ m{$rs}) {
	push @return, substr $input, 0, $+[0];
	substr($input, 0, $+[0], '');
    if (length($input)) {
	push @return, $input;
    return @return;

sub _emulate_readline_scalar {
    my ($handle, $nfound, $rin) = @_;
    my ($timeleft, $rout);
    my $input = '';
    while ($nfound) {
	my $buffer = '';
	# XXX - does getc work as well as sysread ..., 1 ?
	# last unless defined($buffer = getc($handle));
	last if 0 == sysread $handle, $buffer, 1;
	$input .= $buffer;
	last if length($/) > 0 && substr($input,-length($/)) eq $/;
	($nfound,$timeleft) = select $rout=$rin, undef, undef, 0.0;
    return $input;

sub Forks::Super::Job::read_stdout {
    my ($job, %options) = @_;
    return _readline($job->{child_stdout}, $job, wantarray, %options);

sub Forks::Super::Job::read_stderr {
    my ($job, %options) = @_;
    return _readline($job->{child_stderr}, $job, wantarray, %options);

sub Forks::Super::Job::getc_stdout {
    my ($job, %options) = @_;
    return _getc($job->{child_stdout}, $job, %options);

sub Forks::Super::Job::getc_stderr {
    my ($job, %options) = @_;
    return _getc($job->{child_stderr}, $job, %options);

sub _getc {
    my ($fh, $job, %options) = @_;
    return if !__sanitize_read_inputs($fh, $job, 0, %options);

    if ($$fh->{is_socket}) {
	return _getc_socket($fh, $job, %options);
    } elsif ($$fh->{is_pipe}) {
	return _getc_pipe($fh, $job, %options);

    my ($expire, $blocking_desired) = __extract_read_options($fh, \%options);
    GETC: {
	local $! = undef;
	my $c = getc($fh);
	if (defined $c) {
	    return $c;

	last if _check_if_job_is_complete_and_close_io($job, $fh);
	seek $fh, 0, 1;
	if ($blocking_desired) {
	    if ($expire > 0 && Time::HiRes::time() >= $expire) {
		$blocking_desired = 0;
	    } else {
	if (!$blocking_desired) {
	    if ($job->{is_child}) {
	    } else {
		return '';
	redo GETC;

sub _getc_socket {
    my ($sh, $job, %options) = @_;

    local $Devel::DumpTrace::TRACE = 1;

    return if !__sanitize_read_inputs($sh, $job, 0, %options);
    my $zz = eval { $sh->opened };
    if ($@) {
	carp 'Forks::Super::_getc_socket: read on unopened, unopenable ',
	    "socket $sh, ref=", ref($sh), ", error=$@\n";
    } elsif (!$zz) {
	carp "Forks::Super::_getc_socket: read on unopened socket $sh ",
	    $job->toString(), "\n";

    my ($expire, $blocking_desired) = __extract_read_options($sh, \%options);
    while ($sh->blocking() || &IS_WIN32 || $blocking_desired) {
	my $fileno = fileno($sh);
	if (not defined $fileno) {
	    $fileno = Forks::Super::Job::Ipc::fileno($sh);
	    Carp::cluck "Cannot determine FILENO for socket handle $sh!";

	my ($rin,$rout);
	my $timeout = $Forks::Super::SOCKET_READ_TIMEOUT || 1.0;
	($timeout, $blocking_desired)
	    = __get_select_timeout($timeout, $expire, $blocking_desired);

	$rin = '';
	vec($rin, $fileno, 1) = 1;

	# perldoc select: warns against mixing select4
	# (unbuffered input) with readline (buffered input).
	# Do I have to do my own buffering? That would be weak.
	# Or are sockets already unbuffered?

	local $! = undef;
	my ($nfound,$timeleft) = select $rout=$rin, undef, undef, $timeout;

	if ($nfound) {
	    if ($rin ne $rout && $DEBUG) {
		debug("No input found on $sh/$fileno ",
		      "[shouldn't reach this block]");
	    if ($nfound == -1) {
		warn "Forks::Super:_read_socket: Error in select4(): $! $^E.\n";
	if ($DEBUG) {
	    debug("no input found on $sh/$fileno");
	return if ! $blocking_desired;

    # prefer recv/sysread to getc, as the latter is susceptible to
    # buffering compatibility problems with 4-arg select.
    if (ref($sh) eq 'Forks::Super::Tie::IPCSocketHandle::Delegator') {
	$sh = $$sh->{DELEGATE};
    my ($n,$c);
    $n = recv $sh, $c, 1, 0;
    return $c;

sub _getc_pipe {
    my ($sh, $job, %options) = @_;

    if (!defined $sh) {
	if (!defined($options{'warn'}) || $options{'warn'}) {
	    carp 'Forks::Super::_getc_pipe: ',
	        'read on undefined handle for ',$job->toString(),"\n";

    if (defined $$sh->{std_delegate}) {
	$sh = $$sh->{std_delegate};

    my $blocking_desired = __get_option(
	'block', $$sh->{emulate_blocking} || 0, %options);

    # pipes are blocking by default.
    if ($blocking_desired) {
	# XXX - prefer  sysread  to  getc  here?
	return _unbuffered_getc($sh);

    my $fileno = fileno($sh);
    if (! defined $fileno) {
	$fileno = Forks::Super::Job::Ipc::fileno($sh);
	Carp::cluck "Cannot determine FILENO for pipe $sh!";

    my ($rin,$rout);
    $rin = '';
    vec($rin, $fileno, 1) = 1;
    my $timeout = __get_option(
	'timeout', $Forks::Super::SOCKET_READ_TIMEOUT || 1.0, %options);

    local $! = undef;
    my ($nfound, $timeleft) = select $rout=$rin, undef, undef, $timeout;

    if ($nfound == 0) {
	if ($DEBUG) {
	    debug("no input found on $sh/$fileno");
    if ($nfound < 0) {
	# warn "Forks::Super::_getc_pipe: error in select4(): $! $^E\n";
	return; # return ''?

    # XXX - prefer  sysread  to  getc  here ?
    return _unbuffered_getc($sh);

sub _unbuffered_getc {

    # sysread can return undef on solaris in t/44j.
    # Maybe SIGCHLD is causing an interruption?

    # this was fixed in F::S::Tie::IPCPipeHandle::GETC in v0.58, but we
    # have to fix it here, too

    my $cc;
	local $!;
	my $n = sysread $_[0], $cc, 1;
	if (!defined $n) {
	    redo if $!{EINTR};
	    carp "FSJ::Ipc::_unbuffered_getc: $!";
	return if $n==0;
    return $cc;

# called from the parent process,
# attempts to read a line from standard output filehandle
# of the specified child.
# returns "" if the process is running but there is no
# output waiting on the filehandle
# returns undef if the process has completed and there is
# no output waiting on the filehandle
# performs trivial seek on filehandle before reading.
# this will reduce performance but will always clear
# error condition and eof condition on handle
sub _readline {
    my ($fh,$job,$wantarray,%options) = @_;

    return if !__sanitize_read_inputs($fh, $job, $wantarray, %options);

    if ($$fh->{is_socket}) {
	return _read_socket($fh,$job,$wantarray,%options);
    } elsif ($$fh->{is_pipe}) {
	return _read_pipe($fh,$job,$wantarray,%options);

    # WARNING: blocking read on a filehandle can lead to deadlock
    my ($expire, $blocking_desired) = __extract_read_options($fh, \%options);

    local $! = undef;
    if ($wantarray) {
	return _readline_array($job,$fh,$expire,$blocking_desired);
    } else {
	return _readline_scalar($job,$fh,$expire,$blocking_desired);

sub __sanitize_read_inputs {
    my ($fh, $job, $wantarray, %options) = @_;
    if (!defined $fh) {
	if ($job->{debug} && (!defined($options{'warn'}) || $options{'warn'})) {
	    carp 'Forks::Super::_readline(): ',
	        "read on unconfigured handle for job $job->{pid}\n";

    # if this is a child and $USE_TIE_SH, then *STDIN has been assigned to
    # another glob but it's not a real GLOB or a blessed reference, so the
    # $sh->opened  call below will fail.  When we reassign *STDIN like that,
    # also set ${*STDIN}->{std_delegate} to return a blessed reference that
    # is tied to the socket we really need to be reading from.

    if (defined $$fh->{std_delegate}) {
	$fh = $_[0] = $$fh->{std_delegate};

    if ($$fh->{closed}) {
	if (!defined($options{'warn'}) || $options{'warn'}) {
	    carp_once 'Forks::Super::_readline(): ',
	        "read on closed handle for job $job->{pid}\n";
    return 1;

sub __extract_read_options {
    my ($fh, $options) = @_;
    my ($expire, $blocking_desired) = (0, $$fh->{emulate_blocking});
    if (defined $options->{block}) {
	$blocking_desired = $options->{block};
    if (defined($options->{timeout}) && $options->{timeout} > 0) {
	$expire = Time::HiRes::time() + $options->{timeout};
	$blocking_desired = 1;
    return ($expire, $blocking_desired);

sub _check_if_job_is_complete_and_close_io {
    my ($job, $fh) = @_;
    if ($job->is_complete && Time::HiRes::time() - $job->{end} > 3) {
	if ($job->{debug}) {
	    debug("_readline: job $job->{pid} is complete. Closing $fh");
	if (defined($job->{child_stdout}) && $fh eq $job->{child_stdout}) {
	if (defined($job->{child_stderr}) && $fh eq $job->{child_stderr}) {
	return 1;
    return 0;

sub _readline_array {
    my ($job, $fh, $expire, $blocking_desired) = @_;
    my @lines;
    while (@lines == 0) {
	@lines = readline($fh);
	if (@lines > 0) {
	    return @lines;

	if (!_check_if_job_is_complete_and_close_io($job, $fh)) {
	    seek $fh, 0, 1;
	    if ($blocking_desired) {
		if ($expire > 0 && Time::HiRes::time() >= $expire) {
		    $blocking_desired = 0;
		} else {
			1 * $Forks::Super::Util::DEFAULT_PAUSE_IO);
	} else {
	if (!$blocking_desired) {
    return @lines;

sub _readline_scalar {
    my ($job, $fh, $expire, $blocking_desired) = @_;
    my $line;
    while (!defined $line) {
	$line = readline($fh);

	if (defined $line) {
	    return $line;

	last if _check_if_job_is_complete_and_close_io($job, $fh);
	seek $fh, 0, 1;
	if ($blocking_desired) {
	    if ($expire > 0 && Time::HiRes::time() >= $expire) {
		$blocking_desired = 0;
	    } else {
	if (!$blocking_desired) {
	    if (!$job->{is_child}) {
		# in the parent, we can tell the difference between this input
		# stream being empty because the child process is finished
		# (see _check_if_job_is_complete_and_close_io() call, above),
		# and the stream being empty because the child isn't producing
		# enough output to keep it full.

		# We can and do distinguish between these two cases by
		# returning <undef> when the child is finished and will
		# not produce any more input, and  ""  (empty string) when
		# the child is still alive and it could potentially
		# produce more input.

		return '';
	    } else {
		# in the child, we don't make this distinction.

sub init_child {
    %IPC_FILES = @IPC_FILES = ();
    @SAFEOPENED = ();
    %SIG_OLD = ();

sub deinit_child {

    my $job = Forks::Super::Job->this;
    if (defined($job->{share}) && defined($job->{share_ipc})) {
	use Data::Dumper;
	if (open my $fh, '>', $job->{share_ipc}) {
	    print $fh Data::Dumper::Dumper( $job->{share} );
	    close $fh;
	    if ($job->{debug}) {
		debug("shared data written to $job->{share_ipc}");
	} else {
	    carp 'Forks::Super::deinit_child: could not open ',
	        "share ipc file $job->{share_ipc}: $!";

    if (@IPC_FILES > 0) {
        Carp::cluck("Child $$ had temp files! @IPC_FILES\n")
	    if $Forks::Super::CHILD_FORK_OK < 0; #
	unlink @IPC_FILES;
	@IPC_FILES = ();

    my %closed = ();
    foreach my $fh (@{$job->{child_fh_close}}, @SAFEOPENED) {
	if (ref $fh eq 'ARRAY') {
	    if ($fh->[0] eq 'shutdown') {
		no warnings 'closed', 'unopened';
		shutdown $fh->[1], $fh->[2];
		$fh = $fh->[1];
		$$fh->{closed} = $closed{$fh} = 1;
		close $fh;

	next if $closed{$fh}++ || $$fh->{closed};
	close $fh;

sub Forks::Super::Job::ipcToString {
    my $job = shift;
    my @output = ();
    foreach my $attr (qw(child_stdin child_stdout child_stderr)) {
	next if !defined $job->{$attr};
	my $handle = $job->{$attr};
	push @output, "job $job->{pid} handle $attr $handle " . *$handle;
	push @output, "\tref = " . ref($handle);
	push @output, _ipcHandleToString($handle);
    return join ("\n", @output);

sub _ipcHandleToString {
    my $handle = shift;
    my @output = ();
    foreach my $k (sort keys %$$handle) {
	push @output, "\tattribute $k => " . $$handle->{$k} . "\n";
    return wantarray ? @output : join ("\n", @output);
