# $Id: Prima.pm,v 1.9 2010/03/25 12:52:36 dk Exp $
package IO::Lambda::Loop::Prima;
use strict;
use warnings;
use IO::Lambda qw(:constants);
use Time::HiRes qw(time);
use Prima qw(Application);
IO::Lambda::Loop::default('Prima');
use vars qw(%filenos @timers $timer $deadline $event @mask $DEBUG);
# $DEBUG = 1;
$mask[IO_READ] = fe::Read;
$mask[IO_WRITE] = fe::Write;
$mask[IO_EXCEPTION] = fe::Exception;
sub new { bless {} , shift }
sub reset_mask
{
my $f = shift;
my $mask = 0;
for my $flags ( map { $_-> [WATCH_IO_FLAGS]} @{ $f-> {rec}} ) {
$mask |= $mask[$_] for grep { $flags & $_ } IO_READ, IO_WRITE, IO_EXCEPTION;
}
unless ( $mask == $f-> {mask}) {
$f-> {object}-> mask($f-> {mask} = $mask);
warn
fileno($f->{object}->file), " set new mask ",
(( $mask & fe::Read) ? 'read ' : ''),
(( $mask & fe::Write) ? 'write ' : ''),
(( $mask & fe::Exception) ? 'exception ' : ''),
"\n"
if $DEBUG;
}
}
my @prima_events;
sub io_filter { push @prima_events, \@_; return 0 }
sub on_read { on_io($_[0], fe::Read) }
sub on_write { on_io($_[0], fe::Write) }
sub on_exception { on_io($_[0], fe::Exception) }
sub on_io
{
$event++;
my ( $obj, $flags) = @_;
my $fileno = fileno($obj-> file);
warn "event $fileno/$filenos{$fileno}->{mask} $flags\n" if $DEBUG;
# Read up all events sitting in the queue
#
# This is to collect all eventual IO events at once, and
# not to handle each other separately in callbacks. Needed in
# situations when f.ex. a record listens for IO_READ|IO_WRITE --
# if callbacks to be called separately, it'll be even more mess
my $hook = Prima::Component-> event_hook;
@prima_events = ();
Prima::Component-> event_hook( \&io_filter);
$::application-> yield;
Prima::Component-> event_hook( $hook);
# create mapping fileno -> flags
my %files = ( map { ("$_->{object}" => 1) } ( values %filenos ));
my @ev;
my @xr;
my %masks = ( $fileno => 0 |
(( $flags & fe::Read) ? IO_READ : 0) |
(( $flags & fe::Write) ? IO_WRITE : 0) |
(( $flags & fe::Exception) ? IO_EXCEPTION : 0)
);
for ( grep { exists $files{ $_->[0] }} @prima_events) {
my $lflags = 0;
$lflags |= IO_READ if $_-> [1] eq 'Read';
$lflags |= IO_WRITE if $_-> [1] eq 'Write';
$lflags |= IO_EXCEPTION if $_-> [1] eq 'Exception';
my $fileno = fileno($_->[0]-> file);
$masks{ $fileno } ||= 0;
$masks{ $fileno } |= $lflags;
}
@prima_events = grep { not exists $files{ $_->[0] }} @prima_events;
# filter records based on %masks
while ( ( $fileno, $flags) = each %masks) {
my $f = $filenos{$fileno};
next unless $f;
my @xr;
for my $r ( @{$f-> {rec}}) {
if ( $r-> [WATCH_IO_FLAGS] & $flags) {
$r-> [WATCH_IO_FLAGS] &= $flags;
push @ev, $r;
} else {
push @xr, $r;
}
}
next if @xr == @{$f->{rec}};
if ( @xr) {
$f-> {rec} = \@xr;
reset_mask( $f);
} else {
warn "$fileno object destroyed\n" if $DEBUG;
$f-> {object}-> destroy if $f-> {object};
delete $filenos{$fileno};
}
}
my %timer = (map { $_ => undef } grep { defined $_-> [WATCH_DEADLINE] } @ev);
if ( scalar keys %timer) {
@timers = grep { not exists $timer{"$_"}} @timers;
reset_timer();
}
if ( $DEBUG) {
warn "prima events: ", scalar(@prima_events), "\n";
warn "io dispatch ", join(' ', map { defined($_) ? $_ : 'undef' } @$_), "\n"
for @ev;
}
$$_[WATCH_OBJ]-> io_handler( $_) for @ev;
Prima::Component::notify( @$_ ) for @prima_events;
}
sub watch
{
my ( $self, $rec) = @_;
my $fileno = fileno $rec->[WATCH_IO_HANDLE];
die "Invalid filehandle" unless defined $fileno;
my $flags = $rec->[WATCH_IO_FLAGS];
unless ( $filenos{$fileno}) {
my $f = Prima::File-> new(
owner => $::application,
file => $rec->[WATCH_IO_HANDLE],
onRead => \&on_read,
onWrite => \&on_write,
onException => \&on_exception,
);
die "Error creating Prima::File:$@" unless $f;
$filenos{$fileno} = {
object => $f,
mask => 0,
rec => [],
};
warn "object created for $fileno\n" if $DEBUG;
}
my $f = $filenos{$fileno};
push @{$f-> {rec}}, $rec;
reset_mask($f);
$self-> after( $rec) if $rec-> [WATCH_DEADLINE];
}
sub deadline
{
return undef unless @timers;
my $new_deadline = $timers[0]-> [WATCH_DEADLINE];
for ( map { $_-> [WATCH_DEADLINE] } @timers) {
next if $_ >= $new_deadline;
$new_deadline = $_;
}
return $new_deadline;
}
sub reset_timer
{
$deadline = deadline();
unless ( defined $deadline) {
warn "stop timer\n" if $DEBUG;
$timer-> stop if $timer;
return;
}
my $timeout = $deadline - time;
$timeout = 0.001 if $timeout < 0;
$timeout = int( $timeout * 1000 + .5);
$timer-> timeout( $timeout);
$timer-> start;
warn "start timer ", $timeout/1000, "\n" if $DEBUG;
}
sub on_tick
{
warn "event timer\n" if $DEBUG;
$event++;
my @ev;
my $t = time;
# timers
push @ev, grep { $_-> [WATCH_DEADLINE] <= $t } @timers;
@timers = grep { $_-> [WATCH_DEADLINE] > $t } @timers;
# files
my @kill;
while ( my ( $fileno, $r) = each %filenos) {
my @xr = grep {
not defined($_-> [WATCH_DEADLINE]) or
($_-> [WATCH_DEADLINE] > $t)
} @{$r->{rec}};
next if @xr == @{$r->{rec}};
if ( @xr) {
$r-> {rec} = \@xr;
reset_mask( $r);
} else {
push @kill, $fileno;
}
}
for ( @kill) {
warn "$_ object destroyed\n" if $DEBUG;
$filenos{$_}-> {object}-> destroy
if $filenos{$_}-> {object};
delete $filenos{$_};
}
reset_timer;
$$_[WATCH_IO_FLAGS] = 0 for @ev;
if ( $DEBUG) {
warn "timer dispatch ", join(' ', map { defined($_) ? $_ : 'undef' } @$_), "\n"
for @ev
}
$$_[WATCH_OBJ]-> io_handler( $_) for @ev;
}
sub after
{
my ( $self, $rec) = @_;
push @timers, $rec;
unless ( $timer) {
$timer = Prima::Timer-> create(
owner => $::application,
onTick => \&on_tick,
active => 0,
);
die "Error creating Prima::Timer:$@" unless $timer;
warn "created global timer\n" if $DEBUG;
}
reset_timer;
}
sub empty { ( @timers + scalar keys %filenos) ? 0 : 1 }
sub yield
{
warn "yield\n" if $DEBUG;
my ( $self, $nonblocking ) = @_;
local $event = 0;
$::application-> yield;
return if $nonblocking;
$::application-> yield while $event == 0;
}
sub remove
{
my ($self, $obj) = @_;
my $t = @timers;
@timers = grep { defined($_-> [WATCH_OBJ]) and $_-> [WATCH_OBJ] != $obj } @timers;
reset_timer if $t != @timers;
my @kill;
while ( my ( $fileno, $r) = each %filenos) {
my @xr = grep { defined($_-> [WATCH_OBJ]) and $_-> [WATCH_OBJ] != $obj } @{$r->{rec}};
next if @xr == @{$r->{rec}};
if ( @xr) {
$r-> {rec} = \@xr;
reset_mask( $r);
} else {
push @kill, $fileno;
}
}
for ( @kill) {
warn "$_ object destroyed\n" if $DEBUG;
$filenos{$_}-> {object}-> destroy
if $filenos{$_}-> {object};
delete $filenos{$_};
}
}
sub remove_event
{
my ($self, $rec) = @_;
my $t = @timers;
@timers = grep { $_ != $rec } @timers;
reset_timer if $t != @timers;
my @kill;
while ( my ( $fileno, $r) = each %filenos) {
my @xr = grep { $_ != $rec } @{$r->{rec}};
next if @xr == @{$r->{rec}};
if ( @xr) {
$r-> {rec} = \@xr;
reset_mask( $r);
} else {
push @kill, $fileno;
}
}
for ( @kill) {
warn "$_ object destroyed\n" if $DEBUG;
$filenos{$_}-> {object}-> destroy
if $filenos{$_}-> {object};
delete $filenos{$_};
}
}
sub signal { $event++ }
END { undef $timer };
1;
__DATA__
=pod
=head1 NAME
IO::Lambda::Loop::Prima - Prima-based event loop for IO::Lambda
=head1 DESCRIPTION
This is the implementation of event loop for C<IO::Lambda> based on C<Prima> event
loop. The module is not intended for direct use.
=head1 SYNOPSIS
use Prima;
use IO::Lambda::Loop::Prima; # explicitly select the event loop module
use IO::Lambda;
=head1 SEE ALSO
L<Prima>