The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.


package Stem::Load::Ticker ;

use strict ;

use Time::HiRes qw( gettimeofday tv_interval ) ;

my $attr_spec = [


	{
		'name'		=> 'reg_name',
		'help'		=> <<HELP,
Name this Cell was registered with.
HELP
	},
	{
		'name'		=> 'dbi_addr',
		'help'		=> <<HELP,
Address to send the insert messages
HELP
	},
	{
		'name'		=> 'max_cnt',
		'default'	=> 20,
		'help'		=> <<HELP,
Maximum number of rows to insert
HELP
	},
	{
		'name'		=> 'parallel_cnt',
		'default'	=> 1,
		'help'		=> <<HELP,
Number of inserts to do in parallel
HELP
	},
] ;

sub new {

	my( $class ) = shift ;

	my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
	return $self unless ref $self ;

	return $self ;
}

sub go_cmd {

	my( $self, $msg ) = @_ ;

	my %go_args ;

	if ( my $data = $msg->data() ) {

		%go_args = ${$data} =~ /(\S+)=(\S+)/g if $$data ;
	}

	$self->{'start_time'} = gettimeofday() ;
	$self->{'go_from_addr'} = $msg->from() ;
	$self->{'go_max_cnt'} = $go_args{'max_cnt'} || $self->{'max_cnt'} ;

	$self->{'inserted_cnt'} = 0 ;
	$self->{'send_cnt'} = $self->{'go_max_cnt'} ;
	$self->{'parallel_cnt'} = $go_args{'para_cnt'} if $go_args{'para_cnt'} ;

	$self->send_ticker_msgs( $self->{'parallel_cnt'} ) ;

	return "Ticker Started\n" ;
}

sub send_ticker_msgs {

	my( $self, $parallel_cnt ) = @_ ;

#print "PARA $parallel_cnt\n" ;

	while ( $parallel_cnt-- ) {

		$self->insert_ticker_row() ;
	}

	return ;
}

sub insert_ticker_row {

	my( $self ) = @_ ;

	return if $self->{'send_cnt'} <= 0 ;
	$self->{'send_cnt'}-- ;

	my $ticker = join '', map ['A' .. 'Z']->[rand 26], 1 .. 3 ;

	my $price = 100 + int rand 9900 ;

	my $delta = -1000 + int rand 2000 ;

	my $dbi_msg = Stem::Msg->new(

		'to'		=> $self->{'dbi_addr'},
		'from'		=> $self->{'reg_name'},
		'type'		=> 'cmd',
		'cmd'		=> 'execute',
		'reply_type'	=> 'insert_done',
		'data'		=> {
			statement	=> 'insert_tick',
			bind		=> [ $ticker, $price, $delta ],
		},
	);

#print $dbi_msg->dump( 'SEND' ) ;
	$dbi_msg->dispatch() ;

	return ;
}

sub insert_done_in {

	my( $self, $msg ) = @_ ;

#print $msg->dump( 'DONE' ) ;

	if ( $self->{'send_cnt'} ) {

		$self->send_ticker_msgs( 1 ) ;
	}

	if ( ++$self->{'inserted_cnt'} >= $self->{'go_max_cnt'} ) {

		my $data = $msg->data() ;

		die "insert_done_in: $$data" unless ref $data eq 'HASH' ;

		my $time_delta = sprintf( "%8.4f",
				     gettimeofday() - $self->{'start_time'} ) ;

		my $rows_per_second = $self->{'inserted_cnt'} / $time_delta ;

		my $done_msg = Stem::Msg->new(
			'to'	=> $self->{'go_from_addr'},
			'from'	=> $self->{'reg_name'},
			'type'	=> 'response',
			'data'	=> <<DATA,
inserted $self->{'inserted_cnt'} rows in $time_delta seconds
$rows_per_second rows per second
with $self->{'parallel_cnt'} inserts in parallel
last row ID $data->{'insert_id'}
DATA
		) ;

		$done_msg->dispatch() ;

		return ;
	}


	return ;
}

1 ;