#!/usr/bin/env perl -l
###
### THIS SCRIPT DOES NOT WORK
###
### I suspect that Coro and MX::Workers won't play nice, but
### it will have to wait till later for me to debug it
###
#
# http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder
#
#
# Rather than Erlang (as was in here before) this is more based on the
# Scala version of this code at
# http://www.martin-probst.com/2007/09/24/wide-finder-in-scala/
#
#
# requires the data at http://www.tbray.org/tmp/o10k.ap
#
$|++;
sub main {
die 'no file' unless -e 'ex/tbray.data.big';
Slurp->new( filename => 'ex/tbray.data.big' );
}
{
package Slurp;
use MooseX::Coro;
use IO::File;
use Coro;
has filename => (
isa => 'Str',
is => 'ro',
);
has count => (
isa => 'HashRef',
is => 'rw',
default => sub { {} },
);
has file => (
isa => 'IO::File',
is => 'ro',
lazy => 1,
default => sub { IO::File->new( $_[0]->filename, 'r' ); },
);
has counter => (
reader => 'c',
default => sub { Count->new( sender => $_[0] ) },
handles => { counter => 'yield' },
);
sub START {
$_[0]->yield('loop');
}
event loop => sub {
my ($self) = @_;
my $file = $self->file;
while ( not eof $file ) {
my @chunk;
for ( 0 .. ( 600000 / 8 ) ) {
$_ = <$file>;
push @chunk, $_;
}
$self->counter( 'loop', $self, \@chunk );
}
$self->yield('tally');
};
event inc => sub {
my ( $self, $chunk ) = @_;
my $count = $self->count;
$count->{$_} += $chunk->{$_} for ( keys %$chunk );
$_[0]->count($count);
};
event tally => sub {
my $count = $_[0]->count;
print "$count->{$_}: $_"
for sort { $count->{$b} <=> $count->{$a} } keys %$count;
$_[0]->yield('STOP');
};
__PACKAGE__->meta->make_immutable;
}
{
package Count;
use MooseX::Coro;
use JSON::Any qw(XS);
with qw(MooseX::Workers);
sub BUILD { POE::Kernel->run }
has sender => (
reader => 's',
required => 1,
handles => { sender => 'yield' },
);
event loop => sub {
my ( $self, $chunk ) = @_;
my $count = {};
my $rx = qr|GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+)|o;
$self->spawn(
sub {
for my $line (@$chunk) {
$count->{$1}++ if $line =~ $rx;
}
print JSON::Any->encode($count);
}
);
};
sub worker_stdout {
my ( $self, $out ) = @_;
warn $out;
my $count = JSON::Any->decode($out);
$self->sender_return( 'inc', $count );
}
sub worker_manager_start { warn 'started worker manager' }
sub worker_manager_stop { warn 'stopped worker manager' }
sub max_workers_reached { warn 'maximum worker count reached' }
sub worker_stderr { shift; warn 'STDERR: ' . join ' ', @_; }
sub worker_error { shift; warn join ' ', @_; }
sub worker_done { shift; warn 'DONE: ' . join ' ', @_; }
sub worker_started { shift; warn 'STARTED: ' . join ' ', @_; }
sub sig_child { shift; warn join ' ', @_; }
__PACKAGE__->meta->make_immutable;
}
main();