The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
# $Id: Session.pm,v 1.13 2004/04/22 20:45:58 davidb Exp $
#
# Copyright (C) 2003 Verisign, Inc.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA


package Net::BEEP::Lite::Session;

=head1 NAME

Net::BEEP::Lite::Session

=head1 DESCRIPTION

This is a base class for BEEP sessions.  It handles core tasks common
to both server and client sessions.  This class isn't intended to be
used directly.  Instead, use one of its subclasses.

Note that in reality, this is really a TCP session.  It is not
abstracted away from the TCP transport for BEEP.  In the future it is
possible that it will be and a new TCPSession subclass will be
created.

=cut

use IO::Socket;

use Net::BEEP::Lite::Channel;
use Net::BEEP::Lite::MgmtProfile;

use Carp;
use strict;
use warnings;

=head1 CONSTRUCTOR

=over 4

=item new( I<ARGS> )

This is the main constructor.  It takes a named parameter list as its
argument.  See the C<initialize> method for a list of valid parameter
names.


=back

=cut

sub new {
  my $this  = shift;
  my $class = ref($this) || $this;

  my $self = {};
  bless $self, $class;

  $self->initialize(@_);

  $self;
}

=head1 METHODS

=over 4

=item initialize( I<ARGS> )

Initialize the object.  This is generally called by the constructor
and subclasses.  It takes the following named parameters:

=over 4

=item Socket

the socket associated with this session.

=item NoGreeting

Do not send the greeting message.  This can be sent later with the
C<send_greeting> method.  This will be true if a socket isn't
supplied.

=item DefaultLocalWindow

Set the base local TCP window to a particular value.  This number
should be 4096 (the default) or higher.

=item IdleTimeout

The number of seconds to wait for a frame.  Zero (the default) means
to wait indefinitely.

=item Timeout

The number of seconds to wait for a frame body to be completely read.
This should ususally be non-zero to prevent framing errors from
locking the session forever.  The default is 30 seconds.

=back

It also takes the named parameters for C<Net::BEEP::Lite::MgmtProfile>.

=cut

sub initialize {
  my $self = shift;
  my %args = @_;

  # some defaults:

  $self->{debug} = 0;
  $self->{trace} = 0;

  # we assume the initiator role.  the listener subclass should set
  # this to 2.
  $self->{channelno_counter} = 1;

  # create our management profile.  FIXME: we may want to allow this
  # to be passed in (and only instantiate it once for all sessions.)
  $self->{mgmt_profile} = Net::BEEP::Lite::MgmtProfile->new(%args);

  # our local profiles.
  $self->{profiles} = {};
  # the remote profiles.
  $self->{remote_profiles} = {};

  # our channels.  basically, a hash of channel number to channel
  # object.
  $self->{channels} = {};

  # The default size of our local windows.  This should be at least
  # 4096.
  $self->{default_local_window} = 4096;

  # assign the management profile to channel zero.
  $self->_add_channel(0, $self->{mgmt_profile});
  $self->channel(0)->msgno(1); # start msgno at one because of
                               # greeting RPY.

  # our general received message queue;
  $self->{messages} = [];

  # our default idle timeout
  $self->{idle_timeout} = 0;
  # our default read timeout for frame bodies.
  $self->{timeout} = 60;

  for (keys %args) {
    my $val = $args{$_};

    /^socket/io and do {
      $self->{sock} = $val;
      next;
    };
    /^no.?greeting$/io and do {
      $self->{_no_greeting} = $val;
      next;
    };
    /^default.?local.?window$/io and do {
      $self->{default_local_window} = $val;
      next;
    };
    /^debug$/io and do {
      $self->{debug} = $val;
      next;
    };
    /^trace$/io and do {
      $self->{trace} = $val;
      next;
    };
    /^idle.?timeout/io and do {
      $self->{idle_timeout} = $val;
      next;
    };
    /^timeout$/io and do {
      $self->{timeout} = $val;
      next;
    };
  }

  # set NoGreeting to true if the socket wasn't set or isn't 
  $self->{_no_greeting} = 1 if (! $self->{sock} or
				(ref($self->{sock}) and
				     !$self->{sock}->isa('IO::Socket')));
}

=item socket()

Returns the internal socket.

=cut

sub _socket {
  my $self = shift;

  $self->{sock};
}

=item _set_socket($socket)

Change the session's internal socket to the supplied socket.  Only use
this if you know what you are doing.

=cut

sub _set_socket {
  my $self   = shift;
  my $socket = shift;

  $self->{sock} = $socket;
}

=item _next_channel_number()

This returns the next channel number to request.

=cut

sub _next_channel_number {
  my $self = shift;

  my $res = $self->{channelno_counter};
  $self->{channelno_counter} += 2;
  $self->{channelno_counter} %= 2147483648;

  $res;
}

=item _add_channel($channel_number, [$profile, [$local_window_size]])

This is called by the management profile upon receiving a <start> or
<profile> message from the peer.  If $profile is provided, then this
will bind that profile to the channel.

=cut

sub _add_channel {
  my $self    = shift;
  my $number  = shift;
  my $profile = shift;
  my $window  = shift || $self->{default_local_window};

  $self->{channels}->{$number} = new Net::BEEP::Lite::Channel
    (Number  => $number,
     Profile => $profile,
     Window  => $window,
     Debug   => $self->{debug},
     Trace   => $self->{trace});
}

=item _del_channel($channel_number)

This is called by profiles when it needs to close a channel (either a
close request on the channel or a tuning reset (see
C<_del_all_channels>).

=cut

sub _del_channel {
  my $self   = shift;
  my $number = shift;

  delete $self->{channels}->{$number};
}

=item _del_all_channels()

Close (and destroy) all current channels.  This is most likely to be
done as part of a tuning reset.  You will have to re-add channel zero
after this.

=cut

sub _del_all_channels {
  my $self = shift;

  for my $n (keys %{$self->{channels}}) {
    $self->_del_channel($n);
  }
}

=item add_local_profile($profile)

This method will add a (local) profile to the session.  This will be
advertised in the greeting message.

=cut

sub add_local_profile {
  my $self = shift;
  my $p    = shift;

  $self->{profiles}->{$p->uri()} = $p;
}

=item get_local_profile($uri)

Returns the profile implementation associated with the given uri.

=cut

sub get_local_profile {
  my $self = shift;
  my $uri  = shift;

  $self->{profiles}->{$uri};
}

=item get_local_profile_uris()

Returns the list of profile URIs currently being advertised by this peer.

=cut

sub get_local_profile_uris {
  my $self = shift;
  keys %{$self->{profiles}};
}

=item del_local_profile($uri)

Removes a local profile.

=cut

sub del_local_profile {
  my $self = shift;
  my $uri  = shift;

  delete $self->{profiles}->{$uri};
}

=item add_remote_profile($uri)

This method is used internally when the remote peer advertises a
profile in the greeting message.

=cut

sub add_remote_profile {
  my $self = shift;
  my $uri  = shift;

  $self->{remote_profiles}->{$uri} = 1;
}

=item has_remote_profile($uri)

This method returns true if the remote profile advertised the given
profile uri, false otherwise.

=cut

sub has_remote_profile {
  my $self = shift;
  my $uri = shift;

  $self->{remote_profiles}->{$uri};
}

=item remote_profiles()

This method returns a list of the peer's advertised profiles.

=cut

sub remote_profiles {
  my $self = shift;

  keys %{$self->{remote_profiles}};
}

=item num_open_channels()

Return the number of open channels associated with this session.  This
does not include channel zero.

=cut

sub num_open_channels {
  my $self = shift;

  # return the number of open channels, not including channel zero.
  (scalar keys %{$self->{channels}})  - 1;
}

=item channel($channel_number)

Returns the C<Net::BEEP::Lite::Channel> object for the given channel
number.

=cut

sub channel {
  my $self = shift;
  my $chno = shift;

  $self->{channels}->{$chno};
}

=item servername([$val])

Returns or sets the session's server name.  This is normally set when
the first "<start>" message is encountered with a "serverName"
attribute.

=cut

sub servername {
  my $self = shift;
  my $name = shift;

  $self->{servername} = $name if $name;

  $self->{servername};
}

=item _tuning_reset([$send_greeting])

This does the full tuning reset: close all channels, delete pending
messages in the message queue, recreate channel zero, and (optionally)
re-send the greeting.  This defaults to sending the greeting.

This is normally called when a profile negotiates a security layer
(i.e., TLS or maybe SASL/DIGEST-MD5's encryption).

=cut

sub _tuning_reset {
  my $self 	    = shift;
  my $send_greeting = shift || 1;

  $self->_del_all_channels();
  $self->{messages} = [];
  $self->{remote_profiles} = {};
  $self->_add_channel(0, $self->{mgmt_profile});
  $self->send_greeting() if $send_greeting;
}

=item send_greeting()

Send the greeting message to the peer, and handle the greeting coming
from the peer.  It will advertise any profiles that have been
configured in the session.  Normally, this method is called as part of
the initialization process of the subclasses of this class.

=cut

sub send_greeting {
  my $self = shift;

  # send the greeting message.
  $self->{mgmt_profile}->send_greeting_message($self);

  # handle the remote greeting.
  my $peer_greeting = $self->_recv_message();
  $self->{mgmt_profile}->handle_message($self, $peer_greeting);
}

=item send_message($message)

This will send a BEEP message to the peer (over the channel specified
in the message).  This will handle possible fragmentation due to the
channel window size.

=cut

sub send_message {
  my $self    = shift;
  my $message = shift;

  my $chno = $message->channel_number();
  my $channel = $self->channel($chno);

  croak "send_message: message is on non-existent channel $chno"
    if not $channel;

  # New messages should never set msgno, and we will override it here
  # if it has.  Replies (RPY, ANS, ERR, NULL) should have the same
  # msgno and the MSG to which they are replying.
  if ($message->type() eq 'MSG') {
    carp "MSG messages should NOT have a pre-set message number"
      if defined $message->msgno();
    $message->msgno($channel->next_msgno());
  }
  else {
    if (not defined $message->msgno()) {
      carp "non-MSG message should have a pre-set message number";
      $message->msgno($channel->next_msgno());
    }
  }

  while ($message->has_more_frames()) {
    my $window = $channel->remote_window();

    # if there is no more space on this channel, switch to reading for
    # a bit while we wait for the channel to open.
    if ($window == 0) {
      $self->_read_for_seq($chno);

      $window = $channel->remote_window();
      next if $window == 0;
    }

    my $seqno = $channel->seqno();
    # calculate the next frame
    my $frame = $message->next_frame($seqno, $window);
    # actually send the frame.
    $self->_write_frame($frame);
    # update our current sequence number.
    $channel->update_seqno($frame->size());
    # and adjust the remote window.
    $channel->remote_window($window - $frame->size());
  }
}


=item _recv_message()

This will fetch the next message from the peer, returning a message
object.  It will handle reassembling a fragmented message.  It will
return the first complete message received on any (existing) channel.
It will discard frames on non-existent channels, issuing a warning.

This method will block.  It will return undef if it is not possible to
read from the socket, otherwise it will return the message.

=cut

sub _recv_message {
  my $self = shift;
  my %args = @_;

  # first try and return a message off the queue.
  my $message = $self->_dequeue_message();
  return $message if $message;

  # otherwise, we read one from the socket.

  # This handles interleaved frames for messages on different
  # channels, or ANS messages on the same (or different channels).
  # The channels have slots for building messages of both types.  The
  # first time we see a completing frame, we return that message.

  while (1) {
    # get the next frame from the socket (will block here).
    my $frame = $self->_recv_frame(%args);

    # our frame will have already gone through SEQ processing.  it
    # will also have been added to the appropriate message building
    # slot.

    next if $frame->type() eq 'SEQ';

    my $channel = $self->channel($frame->channel_number());

    # if we have a completing frame, we need to pull the complete
    # message from its build slot and clear it.

    if ($frame->completes()) {
      my $message;

      if ($frame->type() eq 'ANS') {
	$message = $channel->ans_message($frame->ansno());
	$channel->ans_clear_message($frame->ansno());
      }
      else {
	$message = $channel->message();
	$channel->clear_message();
      }

      return $message;
    }
  }
}

=item recv_message()

This will fetch the next message (on any defined channel other than
zero) from the peer, returning a message object.  It will handle
reassembling a fragmented message.  This will directly handle channel
zero messages, so this isn't all that useful for handling replies to
management channel messages.

=cut

sub recv_message {
  my $self  = shift;
  my %args = @_;

  my $message = undef;

  while (1) {
    $message = $self->_recv_message(%args);

    last if $message->channel_number() != 0;

    $self->{mgmt_profile}->handle_message($self, $message);
    return 0 if not $self->_is_connected();
  }
  $message;
}


=item close_session()

close the entire session.  Normally, this should only be called after
sending or receiving the "<ok>" message.  It can also be used in fatal
error situations.

=cut

sub close_session {
  my $self = shift;

  # we should have already sent or received the "ok" message by now
  # (unless we are aborting)
  $self->{sock}->close();
  $self->{sock} = undef;
  print STDERR "closed socket\n" if $self->{debug};
}

sub abort {
  my $self = shift;
  my $message = shift;

  print STDERR "aborting: $message\n" if $message && $self->{debug};
  confess "abort: $message\n" if $self->{trace};
  $self->close_session();

  die "aborted\n";
}


=item _read_for_seq([$channel_number])

Read frames from the socket until receiving a SEQ frame.  If
$channel_number is provided, then read until a SEQ frame on that
channel has been read.  Non SEQ frames read are place either in the
various message-building slots (see _recv_message), or place on the
general message queue.

=cut

sub _read_for_seq {
  my $self = shift;
  my $chno = shift;

  while (1) {
    my $frame = $self->_recv_frame();

    if (not $frame) {
      $self->abort("null frame detected");
      return;
    }

    # at this point, the SEQ has been processed.  This just determines
    # if we are done.
    if ($frame->type() eq 'SEQ') {
      return if not defined $chno;
      return if $chno == $frame->channel_number();
      next;
    }

    # for other frames, we need to make sure that we pull completed
    # messages off the build area and onto the queue.

    if ($frame->completes()) {

      my $channel = $self->channel($frame->channel_number());
      my $message;

      if ($frame->type() eq 'ANS') {
	$message = $channel->ans_message($frame->ansno());
	$channel->ans_clear_message($frame->ansno());
      }
      else {
	$message = $channel->message();
	$channel->clear_message();
      }

      $self->_queue_message($message);
    }
  }
}

=item _recv_frame()

This is an intermediate wrapper around C<_read_frame>.  Essentially,
it reads a frame from the socket, then does a little bit of post
processing, finally returning that frame.  The processing is: if it is
a SEQ frame, it updates the channels remote window size accordingly;
if it is some other frame, it adds it to the appropriate channel's
message building slot.

It returns undef if the socket could not be read from.  It returns 0
if the frame was on a non-existent channel.

=cut

sub _recv_frame {
  my $self = shift;
  my %args = @_;

  my $noseqs;
  if ($args{NoSEQ}) {
    $noseqs = 1;
  }

  my $frame = $self->_read_frame();

  # NOTE: this should never actually happen: _read_frame should
  # abort() instead of returning anything other than a valid frame.
  if (! $frame) {
    $self->abort("null frame received");
    return;
  }

  my $channel = $self->channel($frame->channel_number());

  if (not defined $channel) {
    $self->abort("frame received on non-existent channel " .
		 $frame->channel_number());
    return;
  }

  # handle SEQ frames independently.
  if ($frame->type() eq "SEQ") {
    # calculate new remote window: That is the advertise window
    # minus any bytes that we have already sent.
    my $new_window = $frame->window() -
      ($channel->seqno() - $frame->ackno());
    $channel->remote_window($new_window);
  }
  # assemble message from (possibly) multiple (sequential) frames.
  # the ANS collating case.
  elsif ($frame->type() eq 'ANS') {
    $channel->ans_message_add_frame($frame);
  }
  # the normal message collating case.
  else {
    $channel->message_add_frame($frame);
  }

  # track our last seen seqno from the peer in the channel.
  $channel->peer_seqno($frame->seqno() + $frame->size());

  # Emit a SEQ frame if we've actually read a frame with a payload.
  if ($frame->size() > 0 && !$noseqs) {
    # at the moment, we consider all frames to be "immediately"
    # consumed, so we just emit a constant for the window size.
    my $ackno = $channel->peer_seqno();

    $self->_send_seq($channel, $ackno);
  }

  $frame;
}

=item _send_seq($channel, $ackno)

Send a SEQ for $channel_number to the peer.  $ackno is the sequence
number to acknowledge.  Generally this is the seqno of the frame this
is responding to, plus the size of the payload of that frame.

=cut

sub _send_seq {
  my $self    = shift;
  my $channel = shift;
  my $seqno   = shift;

  my $seq_frame = new Net::BEEP::Lite::Frame
    (Type    => 'SEQ',
     Channel => $channel->number(),
     Ackno   => $seqno,
     Window  => $channel->local_window(),
     Debug   => $self->{debug},
     Trace   => $self->{trace});

  $self->_write_frame($seq_frame);
}

=item _read_frame()

This is an internal method for reading a single frame from the
internal socket.  It returns a C<Net::BEEP::Lite::Frame> object.

=cut

sub _read_frame {
  my $self = shift;

  my $sock = $self->{sock};

  my ($header, $read, $old_alarm_value);

  # set up an alarm handler for this method only.
  local $SIG{ALRM} = sub { die "alarm timeout\n"; };

  # read the header.
  eval {
    $old_alarm_value = alarm($self->{idle_timeout});
    $header = $sock->getline();
  };
  if ($@ and $@ =~ /^alarm timeout/io) {
    $self->abort("idle timeout");
    return;
  } elsif ($@) {
    die $@;
  }
  alarm($old_alarm_value);

  # FIXME: what does a null header mean?
  if (!$header) {
    $self->abort("null header detected (socket closed?)");
    return;
  }

  my $frame = Net::BEEP::Lite::Frame->new(Header => $header,
                                          Debug  => $self->{debug},
                                          Trace  => $self->{trace});

  # make sure the frame could be built \(i.e., known frame type, valid
  # frame headers...\)
  if (! $frame) {
    $self->abort("invalid frame header: '$header'");
    return;
  }

  # if we have no payload (SEQ, NUL), then we are done.
  return $frame if $frame->size() == 0 and ($frame->type() eq 'SEQ' or
                                            $frame->type() eq 'NUL');

  # read the payload.

  # FIXME: the following construct is not ideal. While the loop seems
  # necessary from a theoretical perspective (underlying read
  # operations are not guaranteed to return with all things read), it
  # is unknown if there is a real case where this read call would
  # return early and yet be able to continue.

  # Also note that a timer is set (and probably should always be set)
  # to (help) recover from cases where the frame size was incorrect
  # and too large.
  my $offset = 0;
  my $buffer;
  while (1) {
    eval {
      $old_alarm_value = alarm($self->{timeout});
      $read = $sock->read($buffer, $frame->size(), $offset);
    };
    if ($@ and $@ =~ /^alarm timeout/) {
      $self->abort("read operation timed out (invalid frame?)");
      return;
    } elsif ($@) {
      die $@;
    }
    alarm($old_alarm_value);
    last if ($read == 0 || $read == $frame->size());
    $offset += $read;
  }

  $frame->set_payload($buffer);

  # now read the trailer

  eval {
    $old_alarm_value = alarm($self->{timeout});
    $read = $sock->read($buffer, 5);
  };
  if ($@ and $@ =~ /^alarm timeout/) {
    $self->abort("read operation timed out (invalid frame?)");
    return;
  } elsif ($@) {
    die $@;
  }
  alarm($old_alarm_value);

  if ($buffer ne "END\r\n") {
    $self->abort("invalid frame trailer for '$buffer'");
    return;
  }

  print STDERR "_read_frame: read frame:\n", $frame->to_string, "\n"
    if $self->{trace};
  $frame;
}

=item _write_frame($frame)

This is an internal routine for writing a single frame to the
internally held socket.  $frame MUST be a C<Net::BEEP::Lite::Frame>
object.

=cut

sub _write_frame {
  my $self  = shift;
  my $frame = shift;

  my $sock = $self->{sock};

  $sock->print($frame->to_string());
  $sock->flush();

  print STDERR "_write_frame: wrote frame:\n", $frame->to_string(), "\n"
    if $self->{trace};
}

sub _queue_message {
  my $self    = shift;
  my $message = shift;

  push @{$self->{messages}}, $message;
}

sub _dequeue_message {
  my $self = shift;

  shift @{$self->{messages}};
}

sub _is_connected {
  my $self = shift;

  return ($self->{sock} && $self->{sock}->connected());
}


=pod

=back

=head1 SEE-ALSO

=over 4

=item L<Net::BEEP::Lite::ServerSession>

=item L<Net::BEEP::Lite::ClientSession>

=item L<Net::BEEP::Lite::Message>

=item L<Net::BEEP::Lite::Frame>

=back

=cut

1;