The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package Data::Transform::POE;
=head1 NAME

Data::Transform::POE

=head1 DESCRIPTION

This is a helper package for using Data::Transform with POE. Currently
it provides an alternate version of POE::Wheel::ReadWrite that makes
that package work with Data::Transform. The only change is that handling
of Data::Transform::Meta packets is now supported.

Load this after you've loaded POE::Wheel::ReadWrite, so it will override
what is in there.

Hopefully this can go away as soon as possible

=cut

package # hide from CPAN indexer
        POE::Wheel::ReadWrite;

use strict;

#use vars qw($VERSION);
#$VERSION = do {my($r)=(q$Revision$=~/(\d+)/);sprintf"1.%04d",$r};
#
use Carp qw( croak carp );
use Scalar::Util qw(blessed);
use POE qw(Wheel Driver::SysRW Filter::Line);

# Offsets into $self.
sub HANDLE_INPUT               () {  0 }
sub HANDLE_OUTPUT              () {  1 }
sub FILTER_INPUT               () {  2 }
sub FILTER_OUTPUT              () {  3 }
sub DRIVER_BOTH                () {  4 }
sub EVENT_INPUT                () {  5 }
sub EVENT_ERROR                () {  6 }
sub EVENT_FLUSHED              () {  7 }
sub WATERMARK_WRITE_MARK_HIGH  () {  8 }
sub WATERMARK_WRITE_MARK_LOW   () {  9 }
sub WATERMARK_WRITE_EVENT_HIGH () { 10 }
sub WATERMARK_WRITE_EVENT_LOW  () { 11 }
sub WATERMARK_WRITE_STATE      () { 12 }
sub DRIVER_BUFFERED_OUT_OCTETS () { 13 }
sub STATE_WRITE                () { 14 }
sub STATE_READ                 () { 15 }
sub UNIQUE_ID                  () { 16 }
sub AUTOFLUSH                  () { 17 }

sub CRIMSON_SCOPE_HACK ($) { 0 }

#------------------------------------------------------------------------------

sub new {
  my $type = shift;
  my %params = @_;

  croak "wheels no longer require a kernel reference as their first parameter"
    if (@_ && (ref($_[0]) eq 'POE::Kernel'));

  croak "$type requires a working Kernel" unless defined $poe_kernel;

  my ($in_handle, $out_handle);
  if (defined $params{Handle}) {
    carp "Ignoring InputHandle parameter (Handle parameter takes precedence)"
      if defined $params{InputHandle};
    carp "Ignoring OutputHandle parameter (Handle parameter takes precedence)"
      if defined $params{OutputHandle};
    $in_handle = $out_handle = delete $params{Handle};
  }
  else {
    croak "Handle or InputHandle required"
      unless defined $params{InputHandle};
    croak "Handle or OutputHandle required"
      unless defined $params{OutputHandle};
    $in_handle  = delete $params{InputHandle};
    $out_handle = delete $params{OutputHandle};
  }

  my ($in_filter, $out_filter);
  if (defined $params{Filter}) {
    carp "Ignoring InputFilter parameter (Filter parameter takes precedence)"
      if (defined $params{InputFilter});
    carp "Ignoring OutputFilter parameter (Filter parameter takes precedence)"
      if (defined $params{OutputFilter});
    $in_filter = $out_filter = delete $params{Filter};
  }
  else {
    $in_filter = delete $params{InputFilter};
    $out_filter = delete $params{OutputFilter};

    # If neither Filter, InputFilter or OutputFilter is defined, then
    # they default to POE::Filter::Line.
    unless (defined $in_filter and defined $out_filter) {
      my $new_filter = POE::Filter::Line->new();
      $in_filter = $new_filter unless defined $in_filter;
      $out_filter = $new_filter unless defined $out_filter;
    }
  }

  my $driver = delete $params{Driver};
  $driver = POE::Driver::SysRW->new() unless defined $driver;

  { my $mark_errors = 0;
    if (defined($params{HighMark}) xor defined($params{LowMark})) {
      carp "HighMark and LowMark parameters require each-other";
      $mark_errors++;
    }
    # Then they both exist, and they must be checked.
    elsif (defined $params{HighMark}) {
      unless (defined($params{HighMark}) and defined($params{LowMark})) {
        carp "HighMark and LowMark parameters must both be defined";
        $mark_errors++;
      }
      unless (($params{HighMark} > 0) and ($params{LowMark} > 0)) {
        carp "HighMark and LowMark parameters must be above 0";
        $mark_errors++;
      }
    }
    if (defined($params{HighMark}) xor defined($params{HighEvent})) {
      carp "HighMark and HighEvent parameters require each-other";
      $mark_errors++;
    }
    if (defined($params{LowMark}) xor defined($params{LowEvent})) {
      carp "LowMark and LowEvent parameters require each-other";
      $mark_errors++;
    }
    croak "Water mark errors" if $mark_errors;
  }

  my $self = bless [
    $in_handle,                       # HANDLE_INPUT
    $out_handle,                      # HANDLE_OUTPUT
    $in_filter,                       # FILTER_INPUT
    $out_filter,                      # FILTER_OUTPUT
    $driver,                          # DRIVER_BOTH
    delete $params{InputEvent},       # EVENT_INPUT
    delete $params{ErrorEvent},       # EVENT_ERROR
    delete $params{FlushedEvent},     # EVENT_FLUSHED
    # Water marks.
    delete $params{HighMark},         # WATERMARK_WRITE_MARK_HIGH
    delete $params{LowMark},          # WATERMARK_WRITE_MARK_LOW
    delete $params{HighEvent},        # WATERMARK_WRITE_EVENT_HIGH
    delete $params{LowEvent},         # WATERMARK_WRITE_EVENT_LOW
    0,                                # WATERMARK_WRITE_STATE
    # Driver statistics.
    0,                                # DRIVER_BUFFERED_OUT_OCTETS
    # Dynamic state names.
    undef,                            # STATE_WRITE
    undef,                            # STATE_READ
    # Unique ID.
    &POE::Wheel::allocate_wheel_id(), # UNIQUE_ID
    delete $params{AutoFlush},         # AUTOFLUSH
  ], $type;

  if (scalar keys %params) {
    carp(
      "unknown parameters in $type constructor call: ",
      join(', ', keys %params)
    );
  }

  $self->_define_read_state();
  $self->_define_write_state();

  return $self;
}

#------------------------------------------------------------------------------
# Redefine the select-write handler.  This uses stupid closure tricks
# to prevent keeping extra references to $self around.

sub _define_write_state {
  my $self = shift;

  # Read-only members.  If any of these change, then the write state
  # is invalidated and needs to be redefined.
  my $driver        = $self->[DRIVER_BOTH];
  my $high_mark     = $self->[WATERMARK_WRITE_MARK_HIGH];
  my $low_mark      = $self->[WATERMARK_WRITE_MARK_LOW];
  my $event_error   = \$self->[EVENT_ERROR];
  my $event_flushed = \$self->[EVENT_FLUSHED];
  my $event_high    = \$self->[WATERMARK_WRITE_EVENT_HIGH];
  my $event_low     = \$self->[WATERMARK_WRITE_EVENT_LOW];
  my $unique_id     = $self->[UNIQUE_ID];

  # Read/write members.  These are done by reference, to avoid pushing
  # $self into the anonymous sub.  Extra copies of $self are bad and
  # can prevent wheels from destructing properly.
  my $is_in_high_water_state     = \$self->[WATERMARK_WRITE_STATE];
  my $driver_buffered_out_octets = \$self->[DRIVER_BUFFERED_OUT_OCTETS];

  # Register the select-write handler.

  $poe_kernel->state(
    $self->[STATE_WRITE] = ref($self) . "($unique_id) -> select write",
    sub {                             # prevents SEGV
      0 && CRIMSON_SCOPE_HACK('<');
                                      # subroutine starts here
      my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];

      $$driver_buffered_out_octets = $driver->flush($handle);

      # When you can't write, nothing else matters.
      if ($!) {
        $$event_error && $k->call(
          $me, $$event_error, 'write', ($!+0), $!, $unique_id
        );
        $k->select_write($handle);
      }

      # Could write, or perhaps couldn't but only because the
      # filehandle's buffer is choked.
      else {

        # In high water state?  Check for low water.  High water
        # state will never be set if $event_low is undef, so don't
        # bother checking its definedness here.
        if ($$is_in_high_water_state) {
          if ( $$driver_buffered_out_octets <= $low_mark ) {
            $$is_in_high_water_state = 0;
            $k->call( $me, $$event_low, $unique_id ) if defined $$event_low;
          }
        }

        # Not in high water state.  Check for high water.  Needs to
        # also check definedness of $$driver_buffered_out_octets.
        # Although we know this ahead of time and could probably
        # optimize it away with a second state definition, it would
        # be best to wait until ReadWrite stabilizes.  That way
        # there will be only half as much code to maintain.
        elsif (
          $high_mark and
          ( $$driver_buffered_out_octets >= $high_mark )
        ) {
          $$is_in_high_water_state = 1;
          $k->call( $me, $$event_high, $unique_id ) if defined $$event_high;
        }
      }

      # All chunks written; fire off a "flushed" event.  This
      # occurs independently, so it's possible to get a low-water
      # call and a flushed call at the same time (if the low mark
      # is 1).
      unless ($$driver_buffered_out_octets) {
        $k->select_pause_write($handle);
        $$event_flushed && $k->call($me, $$event_flushed, $unique_id);
      }
    }
 );

  $poe_kernel->select_write($self->[HANDLE_OUTPUT], $self->[STATE_WRITE]);

  # Pause the write select immediately, unless output is pending.
  $poe_kernel->select_pause_write($self->[HANDLE_OUTPUT])
    unless ($self->[DRIVER_BUFFERED_OUT_OCTETS]);
}

#------------------------------------------------------------------------------
# Redefine the select-read handler.  This uses stupid closure tricks
# to prevent keeping extra references to $self around.

sub _define_read_state {
  my $self = shift;

  # Register the select-read handler.

  if (defined $self->[EVENT_INPUT]) {

    # If any of these change, then the read state is invalidated and
    # needs to be redefined.

    my $driver       = $self->[DRIVER_BOTH];
    my $input_filter = \$self->[FILTER_INPUT];
    my $event_input  = \$self->[EVENT_INPUT];
    my $handle_output = $self->[HANDLE_OUTPUT];
    my $event_error  = \$self->[EVENT_ERROR];
    my $unique_id    = $self->[UNIQUE_ID];

    # If the filter can get_one, then define the input state in terms
    # of get_one_start() and get_one().

    if (
      $$input_filter->can('get_one') and
      $$input_filter->can('get_one_start')
    ) {
      $poe_kernel->state(
        $self->[STATE_READ] = ref($self) . "($unique_id) -> select read",
        sub {

          # Protects against coredump on older perls.
          0 && CRIMSON_SCOPE_HACK('<');

          # The actual code starts here.
          my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
          if (defined(my $raw_input = $driver->get($handle))) {
            $$input_filter->get_one_start($raw_input);
            while (1) {
              my $next_rec = $$input_filter->get_one();
              last unless @$next_rec;
              foreach my $cooked_input (@$next_rec) {
                if (blessed ($cooked_input)) {
                  if ($cooked_input->isa('Data::Transform::Meta::SENDBACK')) {
                    $driver->put([$cooked_input->data]);
                    $k->select_resume_write($handle_output);
                    next;
                  }
                }
                $k->call($me, $$event_input, $cooked_input, $unique_id);
              }
            }
          }
          else {
            $$event_error and $k->call(
              $me, $$event_error, 'read', ($!+0), $!, $unique_id
            );
            $k->select_read($handle);
          }
        }
      );
    }

    # Otherwise define the input state in terms of the older, less
    # robust, yet faster get().

    else {
      $poe_kernel->state(
        $self->[STATE_READ] = ref($self) . "($unique_id) -> select read",
        sub {

          # Protects against coredump on older perls.
          0 && CRIMSON_SCOPE_HACK('<');

          # The actual code starts here.
          my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
          if (defined(my $raw_input = $driver->get($handle))) {
            foreach my $cooked_input (@{$$input_filter->get($raw_input)}) {
              $k->call($me, $$event_input, $cooked_input, $unique_id);
            }
          }
          else {
            $$event_error and $k->call(
              $me, $$event_error, 'read', ($!+0), $!, $unique_id
            );
            $k->select_read($handle);
          }
        }
      );
    }
                                        # register the state's select
    $poe_kernel->select_read($self->[HANDLE_INPUT], $self->[STATE_READ]);
  }
                                        # undefine the select, just in case
  else {
    $poe_kernel->select_read($self->[HANDLE_INPUT])
  }
}

#------------------------------------------------------------------------------
# Redefine events.

sub event {
  my $self = shift;
  push(@_, undef) if (scalar(@_) & 1);

  my ($redefine_read, $redefine_write) = (0, 0);

  while (@_) {
    my ($name, $event) = splice(@_, 0, 2);

    if ($name eq 'InputEvent') {
      $self->[EVENT_INPUT] = $event;
      $redefine_read = 1;
    }
    elsif ($name eq 'ErrorEvent') {
      $self->[EVENT_ERROR] = $event;
      $redefine_read = $redefine_write = 1;
    }
    elsif ($name eq 'FlushedEvent') {
      $self->[EVENT_FLUSHED] = $event;
      $redefine_write = 1;
    }
    elsif ($name eq 'HighEvent') {
      if (defined $self->[WATERMARK_WRITE_MARK_HIGH]) {
        $self->[WATERMARK_WRITE_EVENT_HIGH] = $event;
        $redefine_write = 1;
      }
      else {
        carp "Ignoring HighEvent (there is no high watermark set)";
      }
    }
    elsif ($name eq 'LowEvent') {
      if (defined $self->[WATERMARK_WRITE_MARK_LOW]) {
        $self->[WATERMARK_WRITE_EVENT_LOW] = $event;
        $redefine_write = 1;
      }
      else {
        carp "Ignoring LowEvent (there is no high watermark set)";
      }
    }
    else {
      carp "ignoring unknown ReadWrite parameter '$name'";
    }
  }

  $self->_define_read_state()  if $redefine_read;
  $self->_define_write_state() if $redefine_write;
}

#------------------------------------------------------------------------------

sub DESTROY {
  my $self = shift;

  # Turn off the select.  This is a problem if a wheel is being
  # swapped, since it will turn off selects for the other wheel.
  if ($self->[HANDLE_INPUT]) {
    $poe_kernel->select($self->[HANDLE_INPUT]);
    $self->[HANDLE_INPUT] = undef;
  }

  if ($self->[HANDLE_OUTPUT]) {
    $poe_kernel->select($self->[HANDLE_OUTPUT]);
    $self->[HANDLE_OUTPUT] = undef;
  }

  if ($self->[STATE_READ]) {
    $poe_kernel->state($self->[STATE_READ]);
    $self->[STATE_READ] = undef;
  }

  if ($self->[STATE_WRITE]) {
    $poe_kernel->state($self->[STATE_WRITE]);
    $self->[STATE_WRITE] = undef;
  }

  &POE::Wheel::free_wheel_id($self->[UNIQUE_ID]);
}

#------------------------------------------------------------------------------
# TODO - We set the high/low watermark state here, but we don't fire
# events for it.  My assumption is that the return value tells us
# all we want to know.

sub put {
  my ($self, @chunks) = @_;

  my $old_buffered_out_octets = $self->[DRIVER_BUFFERED_OUT_OCTETS];
  my $new_buffered_out_octets;

  if ($self->[FILTER_OUTPUT]->can('meta')) {
    my @filtered_chunks = grep {
      not blessed $_ or not $_->isa('Data::Transform::Meta');
    } @{$self->[FILTER_OUTPUT]->put(\@chunks)};
    $new_buffered_out_octets =
      $self->[DRIVER_BUFFERED_OUT_OCTETS] =
      $self->[DRIVER_BOTH]->put(\@filtered_chunks);
  } else {
    $new_buffered_out_octets =
      $self->[DRIVER_BUFFERED_OUT_OCTETS] =
      $self->[DRIVER_BOTH]->put(
        $self->[FILTER_OUTPUT]->put([
          grep {
            not (blessed $_ and $_->isa('Data::Transform::Meta::EOF'));
          } @chunks
        ])
      );
  }

  if (
    $self->[AUTOFLUSH] &&
    $new_buffered_out_octets and !$old_buffered_out_octets
  ) {
    $old_buffered_out_octets = $new_buffered_out_octets;
    $self->flush();
    $new_buffered_out_octets = $self->[DRIVER_BUFFERED_OUT_OCTETS];
  }

  # Resume write-ok if the output buffer gets data.  This avoids
  # redundant calls to select_resume_write(), which is probably a good
  # thing.
  if ($new_buffered_out_octets and !$old_buffered_out_octets) {
    $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]);
  }

  # If the high watermark has been reached, return true.
  if (
    $self->[WATERMARK_WRITE_MARK_HIGH] and
    $new_buffered_out_octets >= $self->[WATERMARK_WRITE_MARK_HIGH]
  ) {
    return $self->[WATERMARK_WRITE_STATE] = 1;
  }

  return $self->[WATERMARK_WRITE_STATE] = 0;
}

#------------------------------------------------------------------------------
# Redefine filter. -PG / Now that there are two filters internally,
# one input and one output, make this set both of them at the same
# time. -RCC

sub _transfer_input_buffer {
  my ($self, $buf) = @_;

  my $old_input_filter = $self->[FILTER_INPUT];

  # If the new filter implements "get_one", use that.
  if (
    $old_input_filter->can('get_one') and
    $old_input_filter->can('get_one_start')
  ) {
    if (defined $buf) {
      $self->[FILTER_INPUT]->get_one_start($buf);
      while ($self->[FILTER_INPUT] == $old_input_filter) {
        my $next_rec = $self->[FILTER_INPUT]->get_one();
        last unless @$next_rec;
        foreach my $cooked_input (@$next_rec) {
          if (blessed ($cooked_input) and 
                       $cooked_input->isa('Data::Transform::Meta::SENDBACK')) {
            $self->[DRIVER_BOTH]->put([$cooked_input->{data}]);
            $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]);
            next;
          }
          $poe_kernel->call(
            $poe_kernel->get_active_session(),
            $self->[EVENT_INPUT],
            $cooked_input, $self->[UNIQUE_ID]
          );
        }
      }
    }
  }

  # Otherwise use the old behavior.
  else {
    if (defined $buf) {
      foreach my $cooked_input (@{$self->[FILTER_INPUT]->get($buf)}) {
        $poe_kernel->call(
          $poe_kernel->get_active_session(),
          $self->[EVENT_INPUT],
          $cooked_input, $self->[UNIQUE_ID]
        );
      }
    }
  }
}

# Set input and output filters.

sub set_filter {
  my ($self, $new_filter) = @_;
  my $buf = $self->[FILTER_INPUT]->get_pending();
  $self->[FILTER_INPUT] = $self->[FILTER_OUTPUT] = $new_filter;

  $self->_transfer_input_buffer($buf);
}

# Redefine input and/or output filters separately.
sub set_input_filter {
  my ($self, $new_filter) = @_;
  my $buf = $self->[FILTER_INPUT]->get_pending();
  $self->[FILTER_INPUT] = $new_filter;

  $self->_transfer_input_buffer($buf);
}

# No closures need to be redefined or anything.  All the previously
# put stuff has been serialized already.
sub set_output_filter {
  my ($self, $new_filter) = @_;
  $self->[FILTER_OUTPUT] = $new_filter;
}

# Get the current input filter; used for accessing the filter's custom
# methods, as in: $wheel->get_input_filter()->filter_method();
sub get_input_filter {
  my $self = shift;
  return $self->[FILTER_INPUT];
}

# Get the current input filter; used for accessing the filter's custom
# methods, as in: $wheel->get_input_filter()->filter_method();
sub get_output_filter {
  my $self = shift;
  return $self->[FILTER_OUTPUT];
}

# Set the high water mark.

sub set_high_mark {
  my ($self, $new_high_mark) = @_;

  unless (defined $self->[WATERMARK_WRITE_MARK_HIGH]) {
    carp "Ignoring high mark (must be initialized in constructor first)";
    return;
  }

  unless (defined $new_high_mark) {
    carp "New high mark is undefined.  Ignored";
    return;
  }

  unless ($new_high_mark > $self->[WATERMARK_WRITE_MARK_LOW]) {
    carp "New high mark would not be greater than low mark.  Ignored";
    return;
  }

  $self->[WATERMARK_WRITE_MARK_HIGH] = $new_high_mark;
  $self->_define_write_state();
}

sub set_low_mark {
  my ($self, $new_low_mark) = @_;

  unless (defined $self->[WATERMARK_WRITE_MARK_LOW]) {
    carp "Ignoring low mark (must be initialized in constructor first)";
    return;
  }

  unless (defined $new_low_mark) {
    carp "New low mark is undefined.  Ignored";
    return;
  }

  unless ($new_low_mark > 0) {
    carp "New low mark would be less than one.  Ignored";
    return;
  }

  unless ($new_low_mark < $self->[WATERMARK_WRITE_MARK_HIGH]) {
    carp "New low mark would not be less than high high mark.  Ignored";
    return;
  }

  $self->[WATERMARK_WRITE_MARK_LOW] = $new_low_mark;
  $self->_define_write_state();
}

# Return driver statistics.
sub get_driver_out_octets {
  $_[0]->[DRIVER_BUFFERED_OUT_OCTETS];
}

sub get_driver_out_messages {
  $_[0]->[DRIVER_BOTH]->get_out_messages_buffered();
}

# Get the wheel's ID.
sub ID {
  return $_[0]->[UNIQUE_ID];
}

# Pause the wheel's input watcher.
sub pause_input {
  my $self = shift;
  return unless defined $self->[HANDLE_INPUT];
  $poe_kernel->select_pause_read( $self->[HANDLE_INPUT] );
}

# Resume the wheel's input watcher.
sub resume_input {
  my $self = shift;
  return unless  defined $self->[HANDLE_INPUT];
  $poe_kernel->select_resume_read( $self->[HANDLE_INPUT] );
}

# Return the wheel's input handle
sub get_input_handle {
  my $self = shift;
  return $self->[HANDLE_INPUT];
}

# Return the wheel's output handle
sub get_output_handle {
  my $self = shift;
  return $self->[HANDLE_OUTPUT];
}

# Shutdown the socket for reading.
sub shutdown_input {
  my $self = shift;
  return unless defined $self->[HANDLE_INPUT];
  eval { local $^W = 0; shutdown($self->[HANDLE_INPUT], 0) };
  $poe_kernel->select_read($self->[HANDLE_INPUT], undef);
}

# Shutdown the socket for writing.
sub shutdown_output {
  my $self = shift;
  return unless defined $self->[HANDLE_OUTPUT];
  eval { local $^W=0; shutdown($self->[HANDLE_OUTPUT], 1) };
  $poe_kernel->select_write($self->[HANDLE_OUTPUT], undef);
}

# Flush the output handle
sub flush {
  my $self = shift;
  return unless defined $self->[HANDLE_OUTPUT];
  $poe_kernel->call($poe_kernel->get_active_session(),
        $self->[STATE_WRITE], $self->[HANDLE_OUTPUT]);
}

1;