The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#  File: Stem/Cell.pm

#  This file is part of Stem.
#  Copyright (C) 1999, 2000, 2001 Stem Systems, Inc.

#  Stem is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.

#  Stem is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.

#  You should have received a copy of the GNU General Public License
#  along with Stem; if not, write to the Free Software
#  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

#  For a license to use the Stem under conditions other than those
#  described here, to purchase support for this software, or to purchase a
#  commercial warranty contract, please contact Stem Systems at:

#       Stem Systems, Inc.		781-643-7504
#  	79 Everett St.			info@stemsystems.com
#  	Arlington, MA 02474
#  	USA

package Stem::Cell ;

use strict ;

use Data::Dumper ;
use Carp qw( cluck ) ;

use Stem::Route qw( :cell ) ;
use Stem::AsyncIO ;
use Stem::Id ;
use Stem::Gather ;
use Stem::Cell::Clone ;
use Stem::Cell::Pipe ;
use Stem::Cell::Flow ;
use Stem::Cell::Work ;

use Stem::Trace 'log' => 'stem_status' , 'sub' => 'TraceStatus' ;

my %class_to_attr_name ;

my $attr_spec = [

	{
		'name'		=> 'reg_name',
		'help'		=> <<HELP,
The registered address of the owner Cell
HELP
	},
	{
		'name'		=> 'cloneable',
		'type'		=> 'boolean',
		'help'		=> <<HELP,
The parent Cell will be cloned upon triggering
HELP
	},
	{
		'name'		=> 'data_addr',
		'type'		=> 'address',
		'help'		=> <<HELP,
Cell address to send any data read in. If not set here it must come
from a trigger message.
HELP
	},
	{
		'name'		=> 'status_addr',
		'type'		=> 'address',
		'help'		=> <<HELP,
Cell address to send Cell status to
HELP
	},
	{
		'name'		=> 'send_data_on_close',
		'type'		=> 'boolean',
		'help'		=> <<HELP,
Buffer all read data and only send it when the I/O is closed
HELP
	},
	{
		'name'		=> 'no_io',
		'type'		=> 'boolean',
		'help'		=> <<HELP,
Don't do any I/O for the Cell. Either there is none or the owner Cell must
do its own I/O
HELP
	},
	{
		'name'		=> 'pipe_addr',
		'type'		=> 'address',
		'help'		=> <<HELP,
Cell address to open a pipe to
HELP
	},
	{
		'name'		=> 'pipe_args',
		'help'		=> <<HELP,
This is list of arguments or a single argument which is passed to the
cell at the remote end of the pipe.
HELP
	},
	{
		'name'		=> 'aio_args',
		'type'		=> 'hash',
		'help'		=> <<HELP,
This is a list of arguments passed to the Stem::AsyncIO module constructor
HELP
	},
	{
		'name'		=> 'errors_to_output',
		'env'		=> 'errors_to_output',
		'help'		=> <<HELP,
Any received error messages will be sent to the output.
HELP
	},

############
# change this to max_clones
############
	{
		'name'		=> 'id_size',
		'default'	=> 3,
		'help'		=> <<HELP,
Size of unique ID space for clones. Range is 26**N
HELP
	},
	{
		'name'		=> 'trigger_method',
		'default'	=> 'triggered_cell',
		'help'		=> <<HELP,
Method to callback in owner object when cell is triggered
HELP
	},

# the below attributes are not permanent yet
# unused so far.
	{
		'name'		=> 'shut_down_method',
		'default'	=> 'shut_down_cell',
		'help'		=> <<HELP,
Method to callback in owner object when cell is shutdown
HELP
	},
	{
		'name'		=> 'activated_method',
		'default'	=> 'activate_cell',
		'help'		=> <<HELP,
Method to call in owner Cell when the cell is activated. UNSUPPORTED
HELP
	},
	{
		'name'		=> 'sequence_done_method',
		'help'		=> <<HELP,
Method to call in owner Cell when the executing sequence completes.
HELP
	},
	{
		'name'		=> 'codec',
		'help'		=> <<HELP,
This sets the codec that converts data packets to/from a byte stream.
HELP
	},
	{
		'name'		=> 'work_ready_addr',
		'type'		=> 'address',
		'help'		=> <<HELP,
This is the address of the Cell that this Cell sends a message to
when work can be done (i.e. a work message can now be sent here).
HELP
	},
	{
		'name'		=> 'stderr_log',
		'help'		=> <<HELP,
This sets the log that will get the stderr output of the process
HELP
	},
] ;


sub new {

	my( $class ) = shift ;

	my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
	return $self unless ref $self ;

#print $self->_dump( 'NEW' ) ;

	return( $self ) ;
}

# this is only called in Stem::Conf for this class.
# it initializes the cell info object inside its owner object.

sub cell_init {

	my( $self, $owner_obj, $cell_name, $cell_info_attr ) = @_ ;

# the $owner_obj is the cell that owns this Stem::Cell object

	$self->{'owner_obj'} = $owner_obj ;
	$self->{'cell_name'} = $cell_name ;
#	$self->{'from_addr'} = $cell_name ;
					
	$self->{'from_addr'} = Stem::Msg::make_address_string( 
					$Stem::Vars::Hub_name,
					$cell_name
	) ;

	$self->{'cell_info_attr'} = $cell_info_attr ;

# save the attribute name that the owner class uses for the cell info.
# this is how a cell info object can be found given an owner cell object.
# also keep this name in the info itself

#print "OWNER [$owner_obj]\n" ;
	$class_to_attr_name{ ref $owner_obj } ||= $cell_info_attr ;

	if ( $self->{'cloneable'} ) {

		$self->{'id_obj'} = Stem::Id->new(
					'size'	=> $self->{'id_size'} ) ;
		$self->{'is_parent'} = 1 ;
		$self->{'target'} = '' ;
	}
}

# get the cell info whether we were called from the owner object or
# the cell info itself ;

sub _get_cell_info {

	my ( $self ) = @_ ;

	my $class = ref $self ;

	return "can't get cell info from '$self'\n" unless $class ;

	return $self if $class eq __PACKAGE__ ;

#print "CLASS [$class][$class_to_attr_name{ $class }]\n" ;

	return $self->{ $class_to_attr_name{ $class } } ;
}

sub cell_trigger {

	my ( $self, @args ) = @_ ;

	my $self_info = $self->_get_cell_info() ;

	return $self_info unless ref $self_info ;

	return if $self_info->{'triggered'} ;

# clone this cell and its info if needed
# $cell will either be $self or a clone of $self

	my $cell = $self_info->_clone() ;

	my $cell_info = $cell->_get_cell_info() ;

	$cell_info->{'triggered'} = 1 ;

#print $cell_info->_dump( 'TRIGGER' ) ;

# set any args (e.g. from trigger message) into this cell 

	$cell_info->cell_set_args( @args ) ;

	$cell_info->_cell_pipe() ;

	if ( my $err = $cell_info->_gather_io_args() ) {
		$cell_info->cell_shut_down( $err ) ;
		return $err ;
	}

# do the callback into the (possibly cloned) cell

	if ( my $err = $cell_info->_callback( 'trigger_method' ) ) {

#print "CALLBACK $err\n" ;

		$cell_info->cell_shut_down( $err ) ;
		return $err ;
	}

#	return $cell_info ;
	return ;
}

sub cell_trigger_cmd {

	my ( $self, $msg ) = @_ ;

	my @args ;

	if ( my $data = $msg->data() ) {

		$data = ${$data} if ref $data eq 'SCALAR' ;

		my $ref = ref $data ;

		if ( ! $ref && defined $data ) {

			unless ( @args = $data =~ /(\S+)=(\S+)/g ) {

				@args = ( 'args' => $data ) ;
			}
		}
		elsif ( $ref eq 'HASH' ) {

			@args = %{$data} ;
		}
		elsif ( $ref eq 'ARRAY' ) {

			@args = @{$data} ;
		}
	}

	push( @args, triggering_msg => $msg ) ;

	my $err = $self->cell_trigger( @args ) ;

print "TRIG ERR [$err]\n" if $err ;

	return $err if ref $err ;
	return ;
}


sub cell_shut_down {

	my( $self, $error ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

#cluck "CELL SHUT\n" ;

#print $cell_info->_dump( 'SHUT' ) ;


	return unless $error || $cell_info->{'active'} ;

	$cell_info->{'error'} = $error ;

#print $cell_info->_dump( "SHUT $error" ) ;

	if ( my $aio = delete $cell_info->{'aio'} ) {

		$aio->shut_down() ;
	}

	if ( my $gather = delete $cell_info->{'gather'} ) {

		$gather->shut_down() ;
	}

	$cell_info->_close_pipe() ;

	$cell_info->_clone_delete() ;

	delete $cell_info->{'args'} ;
# 	delete $cell_info->{'data_addr'} ;

	$cell_info->{'active'} = 0 ;
	$cell_info->{'triggered'} = 0 ;

	TraceStatus "cell shut down done" ;

	return ;
}


sub cell_set_args {

	my( $self, %args ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	@{$cell_info->{'args'}}{ keys %args } = values %args ;

	if ( my $gather = $cell_info->{'gather'} ) {

		my $err = $gather->gathered( keys %args ) ;
		return $err if $err ;
	}

	return ;
}

sub cell_get_args {

	my( $self, @arg_keys ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	return( @{$cell_info->{'args'}}{@arg_keys } ) ;
}

sub cell_info {

	my( $self ) = shift ;

	my $cell_info = $self->_get_cell_info() ;

	$cell_info->{'info'} = shift if @_ ;

	return $cell_info->{'info'} ;
}

sub _gather_io_args {

	my( $self ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	return if $cell_info->{'no_io'} ;

	my @gather_keys = 'aio_args' ;

	push( @gather_keys, 'data_addr' ) if
			$cell_info->{'piped'} &&
			! $cell_info->{'data_addr'} ;

	my $gather = Stem::Gather->new(
			'object'	=> $cell_info,
			'keys'		=> \@gather_keys,
			'gathered_method' => '_cell_activate_io',
	) ;

	return $gather unless ref $gather ;

	$cell_info->{'gather'} = $gather ;

	my $err = $gather->gathered( keys %{$cell_info->{'args'}} ) ;

	return $err if $err ;
}

sub _cell_activate_io {

	my ( $self ) = @_ ;

	TraceStatus "cell activated" ;

	$self->{'active'} = 1 ;

#print $self->_dump( "BEFORE AIO" ) ;

	my @aio_args ;

# get any config args

	if ( my $aio_args = $self->{'aio_args'} ) {

		push( @aio_args, %{$aio_args} ) ;
	}

# args from a trigger message override any config args

	if ( my $msg_aio_args = $self->{'args'}{'aio_args'} ) {

		ref $msg_aio_args eq 'ARRAY' or return <<ERR ;
aio_args is not an ARRAY ref
ERR
		push( @aio_args, @{$msg_aio_args} ) ;
	}

	my $data_addr = $self->{'args'}{'data_addr'} || $self->{'data_addr'} ;

	my $aio = Stem::AsyncIO->new(

		'object'		=> $self->{'owner_obj'},
		'data_addr'		=> $data_addr,
		'from_addr'		=> $self->{'from_addr'},
		'send_data_on_close'	=> $self->{'send_data_on_close'},
		'codec'			=> $self->{'codec'},
		@aio_args,
	) ;

print "AIO ERR [$aio]\n" unless ref $aio ;
	return $aio unless ref $aio ;

	$self->{'aio'} = $aio ;

#print $self->_dump( "AFTER AIO" ) ;

	return ;
}

sub cell_activate {

	my( $self ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	$cell_info->{'active'} = 1 ;
}

*cell_status_cmd = \&status_cmd ;

sub status_cmd {

	my( $self ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	my $info = $cell_info->{'info'} || $cell_info->{'args'}{'info'} || '' ;

	$info =~ s/^/\t\t/mg ;
 
	my $class = ref $cell_info->{'owner_obj'} ;

#	my $data_addr = Stem::Msg::address_string( 
	my $data_addr = $cell_info->{'data_addr'} ||
			$cell_info->{'args'}{'data_addr'} ||
			'[NONE]' ;

	my $active = ( $cell_info->{'active'} ) ? 'Active' : 'Inactive' ;

	my $codec = $cell_info->{codec} || 'NONE' ;

print "CELL STATUS\n" ;

#my $dump = $cell_info->_dump( 'STATUS' ) ;
my $dump = '' ;

	return <<STATUS ;
Cell Status for:
Class:		$class
Addr:		$cell_info->{'from_addr'}
Status:		$active
Data Addr:	$data_addr
Codec:		$codec
Info:$info

SELF: $self
CELL: $cell_info
AIO: $cell_info->{aio}
FH: $cell_info->{fh}

$dump

STATUS

}

sub data_in {

	my( $self, $msg ) = @_ ;

#print "DATA SELF $self\n" ;

#print $msg->dump( 'CELL IN' ) ;

	my $cell_info = $self->_get_cell_info() ;

	if ( $cell_info->{'is_parent'} ) {

#print "PARENT\n" ;
		TraceStatus "parent cell $cell_info->{'from_addr'} ignoring msg" ;

		return ;
	}

	unless( $cell_info->{'active'} ) {
#print "INACTIVE\n" ;

		TraceStatus "cell not active. msg ignored FOO" ;

		return ;
	}

#print $cell_info->_dump( "DATA IN" ) ;

	$cell_info->{data_in_msg} = $msg ;
	$cell_info->cell_write( $msg->data() ) ;
}

sub cell_write {

	my( $self, $data ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	$cell_info->{'aio'}->write( $data ) ;
}

sub _cell_write_sync {

	my( $self, $data ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

#print "SYNC $$data\n" ;

#print $cell_info->_dump( 'SYNC' ) ;

	if ( my $aio_args = $cell_info->{'args'}{'aio_args'} ) {

		my %aio_args = @{$aio_args} ;

		if ( my $fh = $aio_args{'fh'} ) {

#			$fh->blocking( 1 ) ;

			$fh->syswrite( (ref $data) ? $$data : $data ) ;
		}
	}
}

# handle stderr data as plain data

*stderr_data_in = \&data_in ;


# $cell_info is the Stem::Cell object of the parent cell. the name is
# not self as it is differentiated from $clone_info.



sub _callback {

	my ( $self, $method_name, @data ) = @_ ;

	my $method = $self->{$method_name} ;

	my $owner_obj = $self->{'owner_obj'} ;

	if ( $owner_obj->can( $method ) ) {

		return $owner_obj->$method( @data ) ;
	}

	TraceStatus "can't call $method in $owner_obj" ;

	return ;
}

sub cell_from_addr {

	my ( $self ) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	return( $cell_info->{'from_addr'} ) ;
}

use Stem::Debug qw( dump_data ) ;

sub _dump {

	my ( $self, $text ) = @_ ;

return $text . dump_data( $self ) ;

	$text ||= 'CELL' ;

	my $dump = "$text =\n" ;

	my $cell_info = $self->_get_cell_info() ;

# 	my $owner_obj = $cell_info->{owner_obj} ;
# 	my @names = lookup_cell_name( $owner_obj ) ;
# 	$dump .= "\nNames: @names\n" ;

	foreach my $key ( sort keys %{$cell_info} ) {

		my $val = $cell_info->{$key} ;
		next unless defined $val ;

		if ( $key eq 'args' ) {

			$dump .= "\targs = {\n" ;

			foreach my $arg ( sort keys %{$val} ) {

				my $arg_val = $val->{$arg} || '';

				$dump .= "\t\t$arg = '$arg_val'\n" ;
			}

			$dump .= "\t}\n" ;

			next ;
		}

		$dump .= "\t$key = '$val'\n" ;
	}

	$dump .= "\n\n" ;

	return $dump ;
}

sub dump_cmd {

	my ($self) = @_ ;

	my $cell_info = $self->_get_cell_info() ;

	return $cell_info->_dump() . Dumper $cell_info ;
}

1 ;