The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#
# TODO: what this can't handle right now is things like:
#
#	* how many different URLs were there on a per query basis?
#

package Stream::Aggregate;

use strict;
use warnings;
use Hash::Util qw(lock_keys);
use B::Deparse;
use List::Util qw(min max minstr maxstr);
use Config::Checker;
use Stream::Aggregate::Stats;
use Stream::Aggregate::Random;
use List::EvenMoreUtils qw(list_difference_position);
use Tie::Function::Examples qw(%line_numbers);
use Eval::LineNumbers qw(eval_line_numbers);
use Config::YAMLMacros::YAML;
use Carp qw(confess);
use List::MoreUtils qw(uniq);
use Clone qw(clone);

require Exporter;

our @ISA = qw(Exporter);
our @EXPORT = qw(generate_aggregation_func);
our $VERSION = 0.406;

our $suppress_line_numbers = 0;

my $prototype_config = <<'END_PROTOTYPE';
max_stats_to_keep:      '?<4000>Maximum number of stats to keep for mean/stddev etc[INTEGER]'
context:                '?From $log, return an array describing the current context[CODE]'
context_config:         '%optional configuration hash for "context" code'
context2columns:        '?From @current_context, return a hash of columns[CODE]'
context2columns_config: '%optional configuration hash for "context2columns" code'
stringify_context:      '?Turn @currnet_context into an array of strings[CODE]'
stringify_context_config: '%optional configuration hash for "stringify_context" code'
finalize_result:        '?Final opportunity to adjust the return values[CODE]'
finalize_result_config: '%optional configuration has for "finalize_result" code'
filter:                 '?Should this result be saved for statistics and counted for counts?[CODE]'
filter_config:          '%optional configuration hash for "filter" code'
filter_early:           '?<0>Check the filter early (before figuring out contexts)?[BOOLEAN]'
passthrough:            '?Any additional items for the output?[CODE]'
passthrough_config:     '%optional configuration has for "passthrough" code'
ephemeral:              '%ephemeral columns (column -> code)'
ephemeral0:             '%ephemeral columns (column -> code, evaluated before "ephemeral")'
ephemeral2:             '%ephemeral columns (column -> code, evaluated after "ephemeral")'
ephemeral3:             '%ephemeral columns (column -> code, evaluated after crossproduct has set context (after "ephemeral2"))'
output:                 '%generated output columns (column -> code)'
counter:                '%counter columns (column -> code)'
percentage:             '%like a counter, but divided by the number of items'
sum:                    '%summation columns (column -> code)'
dominant:               '%most frequent (mode) value (column -> code)'
mean:                   '%mean value columns (column -> code)'
standard_deviation:     '%standard deviaton value columns (column -> code)'
median:                 '%median value columns (column -> code)'
min:                    '%min value columns (column -> code)'
max:                    '%max value columns (column -> code)'
minstr:                 '%minstr value columns (column -> code)'
maxstr:                 '%maxstr value columns (column -> code)'
keep:                   '%list of values to keep'
stat:                   '%statistical columns (using keep, column -> code)'
debug:                  '?<0>Print out the code for debugging'
strict:                 '?<0>enforce strict and warnings for user code'
preprocess:             '?Code to pre-process the input data[CODE]'
item_name:              '?<$log>Name of the item variable'
new_context:            '?Code that is run when there is a new context[CODE]'
new_context_config:     '%optional configuration hash for "new_context" code'
merge:                  '?Code that is run when merging a subcontext into a parent context[CODE]'
merge_config:           '%optional configuration hash for "merge" code'
reduce:                 '?Code that is run when reducing the saved data to save memory[CODE]'
merge_config:           '%optional configuration hash for "reduce" code'
crossproduct:           '%crossproduct context, keys are existing columns, values are size limits'
simplify:               '%code to choose new simpler values for over-full columns (column -> code)'
combinations:           '%code to decide if new crossproduct context ($row) is worth keeping[CODE]'
END_PROTOTYPE

sub nonblank
{
	my $value = shift;
	return undef unless defined $value;
	return undef if $value eq '';
	return $value;
}

sub resume_line_numbering
{
	my ($pkg, $file, $line) = caller(0);
	return sprintf(qq{#line %d "generated-code-interpoloated-after-%s-%d"\n}, $line, $file, $line);
}

sub generate_aggregation_func
{
	my ($agg_config, $extra, $user_extra) = @_;

	validate_aggregation_config($agg_config);

	my $renumber = ! $agg_config->{debug};

	# input data
	my $itemref;
	my $last_item;

	# 
	# if counting URLs, then the @current_context might be something like:
	#	'com', 'apple', '/movies', '/action'
	# If counting queries it might be something like:
	# 	'homocide',	'movies'
	#
	# @contexts is an array to state variables ($ps) that corrospond to the
	# elements of @current_context.   @context_strings is a string-ified 
	# copy of @current_context to handle contexts which are references.
	#
	# $count_this is return from &$filter_func;
	#
	my @contexts;
	my @context_strings;
	my @current_context;  
	my $ps;
	my $oldps;
	my $count_this = 1;
	my @items_seen = ( 0 );
	my %cross_context;
	my $cross_data = {};
	my @cross_keys;
	my $cross_limit = 1;
	my $cross_count = 0;
	my %cross_key_values;
	my %persist;
	my @combinations;

	# output
	my $row;
	my $suppress_result;

	# reduce data to limit memory use
	my @keepers;
	my @tossers;
	my $max_stats2keep = $agg_config->{max_stats_to_keep};
	my $do_reduce;

	# closures
	my $get_context_func;
	my $count_func;
	my $initialize_func;
	my $final_values_func;
	my $merge_func;
	my $context_columns_func;
	my $preprocess_func;
	my $filter_func;
	my $stringify_func;
	my $finalize_result_func;
	my $passthrough_func;
	my $user_merge_func;
	my $user_new_context_func;
	my $user_reduce_func;
	my $cross_reduce_func;
	my $new_ps_func;
	my $process_func;
	my $finish_context_func;
	my $finish_cross_func;
	my $add_context_component_func;
	my $cross_key_reduce_func;
	my $declarations = '';
	my %combination_funcs;
	my $do_combinations;

	my $strict = $agg_config->{strict}
		? "use strict; use warnings;"
		: "no strict; no warnings;";

	my $eval_line_numbers = $agg_config->{debug}
		? sub { $_[0] }
		: \&eval_line_numbers;

	if ($agg_config->{crossproduct} && keys %{$agg_config->{crossproduct}}) {
		@cross_keys = sort keys %{$agg_config->{crossproduct}};
		for my $k (@cross_keys) {
			$cross_limit *= $agg_config->{crossproduct}{$k};
		}
	}

	my $compile_config = sub {
		my %varname;
		my $reduce_func;
		my %s;
		my %var_types;
		my %var_value;

		my $deparse = B::Deparse->new("-p", "-sC");

		#
		# A more sophisticated approach would figure out the dependencies of one value on
		# another and order them appropriately.   What's going on here is kinda hit & miss.
		#
		my $alias_varname = sub {
			my ($cc, $value) = @_;
			$varname{"\$column_$cc"} = $value;
		};
		my $usercode_inner = sub {
			#
			# The {precount} undef statements may not be required.
			# They are there to be safe, just in case someone is referencing
			# a column that hasn't had its value assigned yet.  If so,
			# they'll always get undef rather than a left-over value from
			# a previous input record.
			# 
			my ($cctype, $cc, $cc_code) = @_;
			if (! defined($cc_code)) {
				$declarations	.= "my \$column_$cc;\n";
				$s{precount}	.= "\tundef \$column_$cc;\n";
				return;
			}
			return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code};
			my $original = $cc_code;
			return $alias_varname->($cc, $varname{$cc_code}) if $varname{$cc_code};
			$cc_code =~ s/(\$column_\w+)/defined($varname{$1}) ? $varname{$1} : $1/ge;
			if ($cc_code =~ /\breturn\b/) {
				$cc_code =~ s/^/\t\t/mg;
				$s{user}	.= qq{#line 3001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber;
				$s{user}	.= "my \$${cctype}_${cc}_func = sub {\n";
				$s{user}	.= $cc_code;
				$s{user}	.= "};\n\n";
				$s{precount}	.= "\tundef \$column_$cc;\n";
				$s{$cctype}	.= "\t\$column_$cc = ";
				$s{$cctype}	.= qq{\$${cctype}_${cc}_func->();\n};
			} elsif ($cc_code =~ /[;\n]/) {
				$cc_code =~ s/^/\t\t/mg;
				$s{precount}	.= "\tundef \$column_$cc;\n";
				$s{$cctype}	.= "\t\$column_$cc = ";
				$s{$cctype}	.= "do {\n";
				$s{$cctype}	.= qq{#line 4001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber;
				$s{$cctype}	.= $cc_code;
				$s{$cctype}	.= "\n\t};\n";
			} elsif ($cc_code =~ /\A\$(column_\w+)\Z/) {
				die "value of $cc_code isn't available yet, please compute it in an earlier step like 'ephemeral0'";
			} else {
				$s{$cctype}	.= qq{#line 5001 "FAKE-$extra->{name}-$cctype-$cc"\n} if $renumber;
				$s{precount}	.= "\tundef \$column_$cc;\n";
				$s{$cctype}	.= "\t\$column_$cc = $cc_code;\n";
			}
			$declarations		.= "my \$column_$cc;\n";

			my $te = eval "no strict; no warnings; sub { $cc_code }";
			die "eval $cctype/$cc: $original ($cc_code): $@" if $@;
			my $body = $deparse->coderef2text($te);
			return $varname{$body} if $varname{$body};
			$varname{$body} = $varname{$cc_code} = $varname{$original} = "\$column_$cc";
			$alias_varname->($cc, $varname{$cc_code});
		};
		my $usercode = sub {
			my ($cctype, $cc, $cc_code) = @_;
			my $value = $usercode_inner->(@_);
			$var_value{$cc} = $value;
			$var_types{$cc} = $cctype;
			return $value;
		};

		my %seen;
		my $cc;

		my @all_data	= qw(ephemeral0 ephemeral ephemeral2 ephemeral3 keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat);
		my @lock_data	= qw(                                           keep output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat);
		my @output_cols	= qw(                                                output counter percentage sum mean standard_deviation median dominant min minstr max maxstr stat);
		my @kept_cols	= qw(                                           keep                                    standard_deviation median dominant                           );
		my @stats_cols	= qw(                                                                                   standard_deviation median dominant                           );
		my @cross_cols	= qw(ephemeral0 ephemeral ephemeral2                                                                                                                 );
		my %cross_cols;
		@cross_cols{@cross_cols} = @cross_cols;

		#
		# Compile all the user code that for the various columns
		#
		for my $ucc (@all_data) {
			next unless $agg_config->{$ucc};
			for $cc (sort keys %{$agg_config->{$ucc}}) {
				die "column $cc is duplicated" if $seen{$cc}++;
				$usercode->($ucc, $cc, $agg_config->{$ucc}{$cc});
			}
		}

		#
		# 'keep' has to be first because 'stat' can't rewrite names
		#
		my %donekeep;
		my $has_keepers = 0;
		for my $keepers (@kept_cols) {
			for $cc (sort keys %{$agg_config->{$keepers}}) {
				next if $donekeep{$varname{$agg_config->{$keepers}{$cc}}};
				$donekeep{$varname{$agg_config->{$keepers}{$cc}}} = $cc;
				$s{initialize}	.= "\t\$ps->{keep}{$cc} = [];\n";
				$s{keeper2}	.= "\t\tpush(\@{\$ps->{keep}{$cc}}, $varname{$agg_config->{$keepers}{$cc}}) if \$count_this;\n";
				$s{merge}	.= "\tpush(\@{\$ps->{keep}{$cc}}, \@{\$oldps->{keep}{$cc}});\n";
				$s{reduce2}	.= "\t\@{\$ps->{keep}{$cc}} = \@{\$ps->{keep}{$cc}}[\@keepers];\n";
				$has_keepers++;
			}
		}
		if ($has_keepers) {
			$s{initialize}	.= "\t# has keepers\n";
			$s{initialize}  .= "\t\$ps->{numeric} = {};\n";

			$s{fv_setup}	.= "\t# has keepers\n";
			$s{fv_setup}	.= "\tlocal(\$Stream::Aggregate::Stats::ps) = \$ps;\n";

			$s{keeper1}	.= resume_line_numbering if $renumber;
			$s{keeper1}	.= "\t# has keepers\n";
			$s{keeper1}	.= "\tmy \$random = rand(1);\n";
			$s{keeper1}	.= "\tif (\@{\$ps->{random}} < $max_stats2keep || \$random < \$ps->{random}[0]) {\n";
			$s{keeper1}	.= "\t\tpush(\@{\$ps->{random}}, \$random);\n";

			$s{keeper3}	.= resume_line_numbering if $renumber;
			$s{keeper3}	.= "\t\t# has keepers\n";
			$s{keeper3}	.= "\t\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n";
			$s{keeper3}	.= "\t}\n";

			$s{merge}	.= resume_line_numbering if $renumber;
			$s{merge}	.= "\t# has keepers\n";
			$s{merge}	.= "\tpush(\@{\$ps->{random}}, \@{\$oldps->{random}});\n";

			$s{merge2}	.= resume_line_numbering if $renumber;
			$s{merge2}	.= "\t# has keepers\n";
			$s{merge2}	.= "\t&\$reduce_func if \@{\$ps->{random}} > $max_stats2keep * 1.5;\n";

			$s{reduce}	.= $eval_line_numbers->(<<'END_REDUCE');
				# has keepers
				my $random = $ps->{random};
				@keepers = sort { $random->[$a] cmp $random->[$b] } 0..$#$random;
				@tossers = splice(@keepers, $max_stats2keep);
				@$random = @$random[@keepers];
END_REDUCE
			$s{reduce} .= resume_line_numbering if $renumber;
		}

		for $cc (sort keys %{$agg_config->{output}}) {
			$s{initialize} .= "\t\$ps->{output}{$cc} = 0;\n";
		}

		for $cc (sort keys %{$agg_config->{counter}}) {
			$s{initialize}	.= "\t\$ps->{counter}{$cc} = 0;\n";
			$s{count2}	.= "\t\$ps->{counter}{$cc}++ if $varname{$agg_config->{counter}{$cc}};\n";
			$s{merge}	.= "\t\$ps->{counter}{$cc} += \$oldps->{counter}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{percentage}}) {
			$s{initialize}	.= "\t\$ps->{percentage}{$cc} = undef;\n";
			$s{stat}	.= "\t\$ps->{percentage}{$cc} = \$ps->{percentage_counter}{$cc} * 100 / (\$ps->{percentage_total}{$cc} || .001);\n";
			$s{initialize}	.= "\t\$ps->{percentage_counter}{$cc} = 0;\n";
			$s{initialize}	.= "\t\$ps->{percentage_total}{$cc} = 0;\n";
			$s{count2}	.= "\t\$ps->{percentage_counter}{$cc}++ if $varname{$agg_config->{percentage}{$cc}};\n"; 
			$s{count2}	.= "\t\$ps->{percentage_total}{$cc}++ if defined $varname{$agg_config->{percentage}{$cc}};\n"; 
			$s{merge}	.= "\t\$ps->{percentage_counter}{$cc} += \$oldps->{percentage_counter}{$cc};\n";
			$s{merge}	.= "\t\$ps->{percentage_total}{$cc} += \$oldps->{percentage_total}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{sum}}) {
			$s{initialize}	.= "\t\$ps->{sum}{$cc} = 0;\n";
			$s{count2}	.= "\t\$ps->{sum}{$cc} += $varname{$agg_config->{sum}{$cc}};\n";
			$s{merge}	.= "\t\$ps->{sum}{$cc} += \$oldps->{sum}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{mean}}) {
			$s{initialize}	.= "\t\$ps->{mean}{$cc} = undef;\n";
			$s{stat}	.= "\t\$ps->{mean}{$cc} = \$ps->{mean_sum}{$cc} / (\$ps->{mean_count}{$cc} || 100);\n";
			$s{initialize}	.= "\t\$ps->{mean_sum}{$cc} = 0;\n";
			$s{initialize}	.= "\t\$ps->{mean_count}{$cc} = 0;\n";
			$s{count2}	.= "\tif (defined($varname{$agg_config->{mean}{$cc}})) {\n";
			$s{count2}	.= "\t	\$ps->{mean_sum}{$cc} += $varname{$agg_config->{mean}{$cc}};\n";
			$s{count2}	.= "\t	\$ps->{mean_count}{$cc}++;\n";
			$s{count2}	.= "\t}\n";
			$s{merge}	.= "\t\$ps->{mean_sum}{$cc} += \$oldps->{mean_sum}{$cc};\n";
			$s{merge}	.= "\t\$ps->{mean_count}{$cc} += \$oldps->{mean_count}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{min}}) {
			$s{initialize}	.= "\t\$ps->{min}{$cc} = undef;\n";
			$s{count2}	.= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, $varname{$agg_config->{min}{$cc}};\n";
			$s{merge}	.= "\t\$ps->{min}{$cc} = min grep { defined } \$ps->{min}{$cc}, \$oldps->{min}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{minstr}}) {
			$s{initialize}	.= "\t\$ps->{minstr}{$cc} = undef;\n";
			$s{count2}	.= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, $varname{$agg_config->{minstr}{$cc}};\n";
			$s{merge}	.= "\t\$ps->{minstr}{$cc} = minstr grep { defined } \$ps->{minstr}{$cc}, \$oldps->{minstr}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{max}}) {
			$s{initialize}	.= "\t\$ps->{max}{$cc} = undef;\n";
			$s{count2}	.= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, $varname{$agg_config->{max}{$cc}};\n";
			$s{merge}	.= "\t\$ps->{max}{$cc} = max grep { defined } \$ps->{max}{$cc}, \$oldps->{max}{$cc};\n";
		}

		for $cc (sort keys %{$agg_config->{maxstr}}) {
			$s{initialize}	.= "\t\$ps->{maxstr}{$cc} = undef;\n";
			$s{count2}	.= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, $varname{$agg_config->{maxstr}{$cc}};\n";
			$s{merge}	.= "\t\$ps->{maxstr}{$cc} = maxstr grep { defined } \$ps->{maxstr}{$cc}, \$oldps->{maxstr}{$cc};\n";
		}

		for my $statc (@stats_cols) {
			for $cc (sort keys %{$agg_config->{$statc}}) {
				my $keepcc = $donekeep{$varname{$agg_config->{$statc}{$cc}}} || die;
				$s{initialize}	.= "\t\$ps->{$statc}{$cc} = undef;\n";
				$s{stat}	.= "\t\$ps->{$statc}{$cc} = $statc('$keepcc');\n";
			}
		}

		for $cc (sort keys %{$agg_config->{stat}}) {
			$s{stat}	.= "\t\$ps->{stat}{$cc} = $varname{$agg_config->{stat}{$cc}};\n";
			$s{initialize}	.= "\t\$ps->{stat}{$cc} = undef;\n";
		}

		for my $cc (sort keys %{$agg_config->{output}}) {
			$s{initialize}	.= "\t\$ps->{output}{$cc} = undef;\n";
			$s{stat}	.= "\t\$ps->{output}{$cc} = $varname{$agg_config->{output}{$cc}};\n";
		}

		for my $icol (@lock_data) {
			$s{initialize} .= "\tlock_keys(%{\$ps->{$icol}});\n"
				if keys %{$agg_config->{$icol}};
		}

		for my $ctype (@output_cols) {
			for $cc (sort keys %{$agg_config->{$ctype}}) {
				$s{final_values} .= "\t\$row->{$cc} = \$ps->{$ctype}{$cc};\n";
			}
		}
		$s{final_values} .= "\t&\$finalize_result_func;\n" if $agg_config->{finalize_result};

		my $code = $strict;
		$code .= qq{\n#line 1 "FAKE-all-code-for-$extra->{name}"\n} if $renumber;
		$code .= qq{\nmy $agg_config->{item_name};\n};
		$code .= $declarations;
		$code .= "{\n";

		$s{reduce} .= "\t&\$user_reduce_func;\n";

		my $assemble_code = sub {
			my ($func, @keys) = @_;
			my $something;
			my $c = "# ---------------------------------------------------------------\n";
			$c .= "\$${func}_func = sub {\n"
				if $func;
			for my $s (@keys) {
				next unless exists $s{$s};
				$c .= qq{\n#line 1001 "FAKEFUNC-$extra->{name}-$func-$s"\n} if $renumber;
				$c .= $s{$s};
				delete $s{$s};
				$something = 1;
			}
			$c .= "\t0\n" 
				if $func && ! $something;
			$c .= "};\n"
				if $func;
			return $c;
		};

		#
		# Cross product aggregation & counts
		#
		if (@cross_keys) {
			my $esub = '';
			my $newsub = '';
			my $oldsub = '';
			my $loop_in = '';
			my $loop_in2 = '';
			my $loop_in3 = '';
			my $loop_in3a = '';
			my $loop_out = '';
			my $loop_out2 = '';
			my $loop_indent = "";
			my $loop_head = '';
			my $loop_mid = '';
			my $loop_mid3 = '';
			my $loop_dbug_old = '';
			my $loop_dbug_new = '';
			for my $cc (@cross_keys) {
				die "Crossproduct column '$cc' doesn't exist" unless $var_types{$cc};
				die "Crossproduct column '$cc' ($var_types{$cc}) isn't a valid type (@cross_cols)" unless $cross_cols{$var_types{$cc}};

				my $cc_code = $agg_config->{simplify}{$cc} || 'return "*";';
				$s{user}	.= "my \$simplify_$cc = sub {\n";
				$s{user}	.= qq{#line 3001 "FAKE-$extra->{name}-simplify-$cc"\n} if $renumber;
				$s{user}	.= "\t".$cc_code;
				$s{user}	.= "\n};\n";

				$loop_head	.= "\tmy %key_count_$cc;\n";

				$loop_mid	.= "\tmy \$key_map_$cc = \$cross_key_reduce_func->('$cc', \\%key_count_$cc, \$simplify_$cc);\n";
				$loop_mid3	.= ", $cc => \$key_$cc";

				$loop_dbug_old	.= " $cc:\$key_$cc";
				$loop_dbug_new	.= " $cc:\$new_$cc";

				$loop_in2	.= "$loop_indent	for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n"; 

				$loop_in	.= "$loop_indent	for my \$key_$cc (keys %{\$cross_data$oldsub}) {\n"; 
				$loop_in	.= "$loop_indent		my \$new_$cc = \$key_$cc;\n";
				$loop_in	.= "$loop_indent		my \$must_inc = 0;\n";
				$loop_in	.= "$loop_indent		if (exists \$key_map_${cc}->{\$key_$cc}) {\n";
				$loop_in	.= "$loop_indent			\$new_$cc = \$key_map_${cc}->{\$key_$cc};\n";
				$loop_in	.= "$loop_indent			\$must_inc = 1;\n";
				$loop_in	.= "$loop_indent			\$must_do++;\n";
				$loop_in	.= "$loop_indent		} else {\n";
				$loop_in	.= "$loop_indent			\$new_$cc = \$key_$cc;\n";
				$loop_in	.= "$loop_indent		}\n";

				$loop_in3a	.= "\$key_count_${cc}{\$key_$cc}++;\n";

				$loop_out	.= "$loop_indent	}\n";
				$loop_out	.= "$loop_indent		\$must_do -= \$must_inc;\n";
				$loop_out2	.= "$loop_indent	}\n";

				$loop_indent	.= "\t";

				$esub		.= "->{$var_value{$cc}}";
				$newsub		.= "->{\$new_$cc}";
				$oldsub		.= "->{\$key_$cc}";
			};
			for my $in3a (split(/\n/, $loop_in3a)) {
				$loop_in3 .= "$loop_indent		$in3a\n";
			}

			$loop_out = join("\n", reverse split(/\n/, $loop_out)) . "\n";
			$loop_out2 = join("\n", reverse split(/\n/, $loop_out2)) . "\n";

			#
			# Reduce the number of contexts
			#

			$cross_key_reduce_func = sub {
				my ($keyname, $valcounts, $simplify_func) = @_;
				my %ret;
				if (keys %$valcounts > $agg_config->{crossproduct}{$keyname}) {
					$do_reduce = 1;
					my $limit = $agg_config->{crossproduct}{$keyname};
					my $current = keys %$valcounts;
					my %seen;
					my %new;
					for my $val (sort { $valcounts->{$a} <=> $valcounts->{$b} } keys %$valcounts) {
						if ($current > $limit) {
							my $new = $simplify_func->($val, $keyname);
							next if $new eq $val;
							if ($seen{$new}++) {
								$current--;
							}
							$new{$new}++;
							if ($new{$val}) {
								# we can't throw this one away since we have new
								# users...  we may not be able to meet our contract.
								$current-- unless --$seen{$new};
								$new{$new}--;
								next;
							} else {
								$ret{$val} = $new;
							}
						}
					}
				}
				print STDERR YAML::Dump("reduce $keyname", \%ret) if $agg_config->{debug} > 2;
				return \%ret;
			};

			my $db1 = '';
			my $db2 = '';
			$db1 = qq{print STDERR "Merging\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tinto\t$loop_dbug_new\t\$cross_count\\n";} if $agg_config->{debug};
			$db2 = qq{print STDERR "Moving\t$loop_dbug_old (\$cross_data${oldsub}->{item_counter})\tto\t$loop_dbug_new\\n";} if $agg_config->{debug};
			$s{cross_reduce} .= resume_line_numbering if $renumber;

			$s{cross_reduce} .= "\t\$do_reduce = 0;\n";
			$s{cross_reduce} .= "\tmy \$must_do = 0;\n";
			$s{cross_reduce} .= $loop_head;
			$s{cross_reduce} .= $loop_in2;
			$s{cross_reduce} .= $loop_in3;
			$s{cross_reduce} .= $loop_out2;
			$s{cross_reduce} .= $loop_mid;
			$s{cross_reduce} .= $loop_in;
			$s{cross_reduce} .= $eval_line_numbers->(<<END_CR);
				# --------------- reduce -------------
				if (\$must_do) {
					if (\$cross_data$newsub) {
						\$cross_count--;
						$db1
						\$ps = \$cross_data$newsub;
						\$oldps = delete \$cross_data$oldsub;
						#
						# print STDERR "ABOUT TO MERGE: \$key_color \$key_size \$key_style \$oldps\\n";
						# print STDERR YAML::Dump("Pre-mege cross-data", \$cross_data);
						#
						&\$merge_func;
						\$ps = \$contexts[-1];
					} else {
						$db2
						\$cross_data$newsub = delete \$cross_data$oldsub;
					}
				}
				# --------------- reduce -------------
END_CR
			$s{cross_reduce} .= resume_line_numbering if $renumber;
			$s{cross_reduce} .= $loop_out;

			#
			# Add data to the right context
			#
			my $db3 = '';
			$db3 = qq{print STDERR "Cross-count: \$cross_count\\n";} if $agg_config->{debug} > 3;
			$s{crossproduct} .= $eval_line_numbers->(<<END_CP);
				if (\$cross_count > \$cross_limit * 2) {
					&\$cross_reduce_func;
				}
				if (\$cross_data$esub) {
					\$ps = \$cross_data$esub;
				} else {
					&\$new_ps_func;
					\$cross_data$esub = \$ps;
					\$cross_count++;
					$db3
				}
END_CP
			$s{crossproduct} .= resume_line_numbering if $renumber;

			#
			# handle combinations
			#
			$s{inner_combine} = '';
			$s{outer_combine} = '';
			if ($agg_config->{combinations}) {
				my $generate_combinations;
				my $combination_number = 0;
				my %mapping;
				$generate_combinations = sub {
					my ($output_field, $input_ps, $indent, $keys, $done, $loop_over) = @_;
					my $out = \$s{$output_field};
					my $loopout = '';
					my $x = '';
					my $y = '';
					my @loop_keys = grep { $agg_config->{combinations}{$_} && ! $done->{$_} } sort @$keys;
					my @delayed_call;
					if ($loop_over) {
						for my $k (@loop_keys) {
							$$out .= "$indent$x	for my \$ck_$k (keys %{$input_ps}) {\n";
							$loopout = "$indent$x	}\n$loopout";
							$x .= "\t";
							$input_ps .= "{\$ck_$k}";
						}
						$y = $x;
						if (! @loop_keys) {
							$y .= "\t";
							$$out .= "$indent$x	if ($input_ps) {\n";
						}
						$$out .= "$indent$y	\$row = ${input_ps}->{row};\n";
					}
					for my $cc (@loop_keys) {
						my @keeping = grep { $_ ne $cc } @loop_keys;
						if ($mapping{"@keeping"}++) {
							$$out .= "$indent$x	# we've already handled keeping '@keeping'\n";
							next;
						}

						my $accessor = @keeping
							? "{" . join("}{", map { "\$row->{'$_'}" } @keeping) . "}"
							: '';

						if ($loop_over && @keeping) {
							$accessor = "{" . join("}{", map { "\$ck_$_" } @keeping) . "}"
						}

						# yes, we're using auto-vivification.  It's ugly, but simplifies
						# the code.

						$$out .= "\n";
						$$out .= "$indent$x	# combine, dropping $cc";
						$$out .= 			", keeping: @keeping" if @keeping;
						$$out .= "\n";

						$$out .= "$indent$x	if (\$combination_funcs{'$cc'}->()) {\n";
						$$out .= "$indent$x		if (\$combinations[$combination_number]$accessor) {\n";
						$$out .= "$indent$x			local(\$Stream::Aggregate::Stats::ps)\n";
						$$out .= "$indent$x				= \$ps\n";
						$$out .= "$indent$x				= \$combinations[$combination_number]$accessor;\n";
						$$out .= "$indent$x			\$oldps = $input_ps;\n" if $input_ps ne '$oldps';
						$$out .= "$indent$x			&\$merge_func;\n";
						$$out .= "$indent$x		} else {\n";
						$$out .= "$indent$x			\$ps = \$combinations[$combination_number]$accessor = clone($input_ps);\n";
						$$out .= "$indent$x			\$ps->{row} = { %\$row };\n";
						$$out .= "$indent$x			delete \$ps->{row}{'$cc'};\n";
						$$out .= "$indent$x		}\n";
						$$out .= "$indent$x	}\n";

						my $pnum = $combination_number++;

						push(@delayed_call, sub {
							$generate_combinations->(
								outer_combine => "\$combinations[$pnum]", 
								"", 
								\@keeping, 
								{ %$done, $cc => 1 },
								$cc);
						});
					}
					if ($loop_over) {
						$$out .= "\n";
						$$out .= "$indent$y	# final values with '@loop_keys' keys\n";
						$$out .= "$indent$y	local(\$Stream::Aggregate::Stats::ps) = \$ps = ${input_ps};\n";
						$$out .= "$indent$y	\$suppress_result = 0;\n";
						$$out .= "$indent$y	\$final_values_func->();\n";
						$$out .= "$indent$y	push(\@\$retref, \$row) unless \$suppress_result;\n";
						$$out .= "\n";
						if (! @loop_keys) {
							$$out .= "$indent$x	}\n";
						}

						$$out .= $loopout;
					}
					while (my $dc = shift(@delayed_call)) {
						$dc->();
					}

				};
				$generate_combinations->(
					inner_combine => '$oldps', 
					"\t\t\t", 
					\@cross_keys, 
					{}, 
					undef);
			}

			#
			# Return the cross product results
			#
			$s{finish_cross} .= qq{print STDERR "Finish cross called\n";} if $agg_config->{debug} > 7;
			$s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-before',\$cross_data);\n} if $agg_config->{debug} > 8;
			$s{finish_cross} .= "\tmy (\$retref) = shift;\n";
			$s{finish_cross} .= "\tmy \$rowtmp;\n";
			$s{finish_cross} .= "\t&\$cross_reduce_func;\n";
			$s{finish_cross} .= qq{print STDERR YAML::Dump('cross_data-after',\$cross_data);\n} if $agg_config->{debug} > 8;
			$s{finish_cross} .= $loop_in2;
			$s{finish_cross} .= $eval_line_numbers->(<<END_FC);
				# --------------- finish cross -------------
				local(\$Stream::Aggregate::Stats::ps) 
					= \$ps
					= \$cross_data$oldsub;
				confess unless \$ps;
				\$suppress_result = 0;
				\$rowtmp = \$row = { &\$context_columns_func $loop_mid3 };
				&\$final_values_func;
				push(@\$retref, \$row) unless \$suppress_result;
				\$oldps = delete \$cross_data$oldsub;
				\$oldps->{row} = \$row;
				\$ps = \$contexts[-1];
				&\$merge_func if \$ps;
				\$cross_count--;
				$db3
END_FC
			$s{finish_cross} .= delete $s{inner_combine};
			$s{finish_cross} .= "\t\t\t\t# --------------- finish cross -------------\n";
			$s{finish_cross} .= resume_line_numbering if $renumber;
			$s{finish_cross} .= $loop_out2;
			$s{finish_cross} .= delete $s{outer_combine};
		} elsif ($agg_config->{combinations}) {
			die "combinations requires crossproduct which isn't defined";
		}

		$code .= $eval_line_numbers->(<<'END_FIELDS');

			my $compile_user_code = sub {
				my ($c, $field, $config_key, $default) = @_;
				return $default unless defined $c->{$field};
				my $config = $c->{$config_key} || {};   # maybe used by eval
				my $coderef;
				my $code = $strict;
				$code .= qq{\n#line 2001 "FAKE-$extra->{name}-$field"\n} if $renumber;
				$code .= qq{sub { $c->{$field} }; };
				my $sub = eval $code;
				die "Cannot compile user code for $extra->{name}/$field: $@\n$code" if $@;
				return $coderef if $coderef;
				return $sub;
			};

			$get_context_func	= $compile_user_code->($agg_config, 'context',			'context_config',		sub { return () });
			$context_columns_func	= $compile_user_code->($agg_config, 'context2columns',		'context2columns_config',	sub { return () });
			$filter_func	        = $compile_user_code->($agg_config, 'filter',			'filter_config',		sub { 1 });
			$preprocess_func	= $compile_user_code->($agg_config, 'preprocess',		'preprocess_config',		sub {});
			$stringify_func		= $compile_user_code->($agg_config, 'stringify_context',	'stringify_context_config',	sub { map { ref($_) ? Dump($_) : $_ } @_ });
			$finalize_result_func	= $compile_user_code->($agg_config, 'finalize_result',		'finalize_result_config',	sub {});
			$passthrough_func	= $compile_user_code->($agg_config, 'passthrough',		'passthrough_config',		sub { return () });
			$user_new_context_func	= $compile_user_code->($agg_config, 'new_context',		'new_context_config',		sub { return () });
			$user_merge_func	= $compile_user_code->($agg_config, 'merge',			'merge_config',			sub { return () });
			$user_reduce_func	= $compile_user_code->($agg_config, 'reduce',			'reduce_config',		sub { return () });

			if ($agg_config->{crossproduct} && $agg_config->{combinations}) {
				for my $crosskey (uniq(keys(%{$agg_config->{crossproduct}}), keys(%{$agg_config->{combinations}}))) {
					$combination_funcs{$crosskey} = $compile_user_code->($agg_config->{combinations}, $crosskey, "combine on $crosskey", sub { 0 });
				}
			}

END_FIELDS
		$code .= "\t\$itemref = \\$agg_config->{item_name};\n";
		$code .= "}\n";

		#
		# New context ($ps) allocator 
		#

		$s{new_ps} .= "\t\$ps = {};\n";
		$s{new_ps} .= "\t\$ps->{item_counter} = 0;\n";
		$s{new_ps} .= "\t\$ps->{heap} = {};\n" 
			if Dump($agg_config) =~ /\{heap\}/;
		if ($has_keepers) {
			$s{new_ps} .= "\t\$ps->{random} = [];\n";
			$s{new_ps} .= "\t\$ps->{sidestats} = {};\n"; # for Stream::Aggregate::Stats
		}
		$s{new_ps} .= "\t\$ps->{unfiltered_counter} = 0;\n"		if $agg_config->{filter};
		$s{new_ps} .= "\t&\$initialize_func;\n"				if $s{initialize};
		$s{new_ps} .= "\t&\$user_new_context_func;\n"			if $agg_config->{new_context};
		$s{new_ps} .= "\t\$ps->{row} = undef;\n";
		$s{new_ps} .= "\tlock_keys(%\$ps);\n";

		#
		# main processing loop, generated for execution efficiency
		#

		$s{process} .= "\t\$last_item = \$\$itemref;\n"
			if Dump($agg_config) =~ /\$last_item\b/;
		$s{process} .= $eval_line_numbers->(<<'END_P0');
			$last_item = $$itemref;
			($$itemref) = @_;
			my @ret;
			unless ($$itemref) {
				$finish_cross_func->(\@ret) if keys %$cross_data;
				$finish_context_func->(\@ret) 
					while @contexts;
				return @ret;
			}
END_P0
		$s{process} .= $eval_line_numbers->(<<'END_P1') if $agg_config->{preprocess};

			&$preprocess_func;

END_P1
		$s{process} .= $eval_line_numbers->(<<'END_P2') if $agg_config->{filter} && $agg_config->{filter_early};

			$count_this = &$filter_func;
END_P2
		$s{process} .= $eval_line_numbers->(<<'END_P3') if $agg_config->{passthrough};

			push(@ret, &$passthrough_func);

END_P3

		if ($agg_config->{context}) {
			$s{process} .= $eval_line_numbers->(<<'END_P4') if $agg_config->{filter} && $agg_config->{filter_early};

				if ($count_this) {

END_P4
			$s{process} .= $eval_line_numbers->(<<'END_P5');

					my @new_context = &$get_context_func;
					my @new_strings = $stringify_func->(@new_context);

					my $diffpos = list_difference_position(@new_strings, @context_strings);

					if (defined $diffpos) {
						$finish_context_func->(\@ret)
							while @current_context >= $diffpos;
					}

					while (@new_context > @current_context) {
						$add_context_component_func->($new_context[@current_context], $new_strings[@current_context]);
					}
END_P5

			$s{process} .= $eval_line_numbers->(<<'END_P7') if $agg_config->{filter} && $agg_config->{filter_early};
				}

END_P7
		}

		$s{process} .= $eval_line_numbers->(<<'END_P7A') if $agg_config->{filter} && ! $agg_config->{filter_early};

			$count_this = &$filter_func;
END_P7A

		$s{process} .= $eval_line_numbers->(<<'END_P7B') if $agg_config->{filter} && ! $agg_config->{context};
				if ($count_this) {
END_P7B
		
		$s{process} .= $eval_line_numbers->(<<'END_P8');
					&$count_func;
					$ps->{item_counter}++;
END_P8

		# this closes the if ($count_this) in P3 or in P7B 
		$s{process} .= $eval_line_numbers->(<<'END_P9') if $agg_config->{filter};  
				}
				$ps->{unfiltered_counter}++;
END_P9

		$s{process} .= $eval_line_numbers->(<<'END_P10');
			return @ret;
END_P10
		$s{process} .= resume_line_numbering if $renumber;

		#
		# Merge contexts func
		#

		$s{merge0} .= "print STDERR YAML::Dump('MERGE', \$ps, \$oldps);\n" if $agg_config->{debug} > 11;
		$s{merge0} .= resume_line_numbering if $renumber;
		$s{merge0} .= "\t\$ps->{item_counter} += \$oldps->{item_counter};\n";
		$s{merge0} .= "\t\$ps->{unfiltered_counter} += \$oldps->{unfiltered_counter};\n" if $agg_config->{filter};
		$s{merge3} .= resume_line_numbering if $renumber;
		$s{merge3} .= "\t&\$user_merge_func;\n";

		$s{fv_setup} .= "print STDERR YAML::Dump('final_values', \$ps);\n" if $agg_config->{debug} > 12;

		$code .= $assemble_code->('', qw(user));
		$code .= $assemble_code->('merge', qw(merge0 merge merge2 merge3));
		$code .= $assemble_code->('cross_reduce', qw(cross_reduce));
		$code .= $assemble_code->('finish_cross', qw(finish_cross));
		$code .= $assemble_code->('new_ps', qw(new_ps));
		$code .= $assemble_code->('process', qw(process));
		$code .= $assemble_code->('initialize', qw(initialize));
		$code .= $assemble_code->('final_values', qw(fv_setup output stat final_values));
		$code .= $assemble_code->('count', qw(precount count ephemeral0 ephemeral ephemeral2 crossproduct ephemeral3 keep standard_deviation median dominant counter percentage sum mean median min minstr max maxstr count2 keeper1 keeper2 keeper3 ));
		$code .= $assemble_code->('reduce', qw(reduce reduce2));
		die "INTERNAL ERROR: ".join(' ', keys %s) if keys %s;

		if ($suppress_line_numbers) {
			$code =~ s/^#line \d+ ".*"\s*?\n//mg;
		}

		print STDERR $line_numbers{$code}."\n" if $agg_config->{debug};

		eval $code;
		die "$@\n$line_numbers{$code}" if $@;

	};

	&$compile_config;

	$add_context_component_func = sub {
		my ($component, $component_string) = @_;

		&$new_ps_func;

		# keep @contexts and @current_context together
		push(@current_context, $component);
		push(@context_strings, $component_string);
		push(@contexts, $ps);

		$items_seen[$#contexts] += 1;
		$#items_seen = $#contexts;
		push(@items_seen, 0);
	};

	$finish_context_func = sub {
		my ($retref) = @_;

		die unless @contexts;

		print STDERR "about to call finish cross\n" if $agg_config->{debug} > 5;
		$finish_cross_func->($retref);

		die unless @contexts;

		confess unless ref $ps;

		$suppress_result = 0;
		$row = {
			&$context_columns_func,
		};
		&$final_values_func;

		# keep @contexts and @current_context together
		$oldps = pop(@contexts);
		pop(@current_context);
		pop(@context_strings);

		$ps = $contexts[-1];

		&$merge_func if $ps;

		push (@$retref, $row) unless $suppress_result;
	};

	return $process_func;

}

sub validate_aggregation_config
{
	my ($agg_config) = @_;
	my $checker = eval config_checker_source;
	die $@ if $@;
	$checker->($agg_config, $prototype_config, '- Stream::Aggregate config');
}

1;