The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package App::Alice::Stream;

use JSON;
use Time::HiRes qw/time/;
use Try::Tiny;
use Any::Moose;

use strict;
use warnings;

has queue => (
  is  => 'rw',
  isa => 'ArrayRef[HashRef]',
  default => sub { [] },
);

sub clear_queue {$_[0]->queue([])}
sub enqueue {push @{shift->queue}, @_}
sub queue_empty {return @{$_[0]->queue} == 0}

has [qw/offset last_send start_time/]=> (
  is  => 'rw',
  isa => 'Num',
  default => 0,
);

has [qw/delayed started closed/] => (
  is  => 'rw',
  isa => 'Bool',
  default => 0,
);

has 'seperator' => (
  is  => 'ro',
  isa => 'Str',
  default => 'xalicex',
);

has 'timer' => (
  is  => 'rw',
);

has 'writer' => (
  is  => 'rw',
  required => 1,
);

has min_bytes => (
  is => 'ro',
  default => 1024,
);

sub BUILD {
  my $self = shift;
  my $local_time = time;
  my $remote_time = $self->start_time || $local_time;
  $self->offset($local_time - $remote_time);
  my $writer = $self->writer->(
    [200, ['Content-Type' => 'multipart/mixed; boundary='.$self->seperator.'; charset=utf-8']]
  );
  $self->writer($writer);
  $self->_send;
}

sub _send {
  my $self = shift;
  try   { $self->send }
  catch { $self->close };
}

sub send {
  my ($self, @messages) = @_;
  die "Sending on a closed stream" if $self->closed;
  $self->enqueue(@messages) if @messages;
  return if $self->delayed or $self->queue_empty;
  if (my $delay = $self->flooded) {
    $self->delay($delay);
    return;
  }
  $self->writer->write( $self->to_string );
  $self->flush;
}

sub close {
  my $self = shift;
  try {$self->writer->write($self->to_string)};
  $self->flush;
  $self->writer->close;
  $self->timer(undef);
  $self->closed(1);
}

sub flooded {
  my $self = shift;
  my $diff = time - $self->last_send;
  if ($diff < 0.2) {
    return 0.2 - $diff;
  }
  return 0;
}

sub delay {
  my ($self, $delay) = @_;
  $self->delayed(1);
  $self->timer(AnyEvent->timer(
    after => $delay,
    cb    => sub {
      $self->delayed(0);
      $self->timer(undef);
      $self->_send;
    },
  ));
}

sub flush {
  my $self = shift;
  $self->clear_queue;
  $self->last_send(time);
}

sub to_string {
  my $self = shift;
  my $output;

  if (! $self->started) {
    $output .= "--".$self->seperator."\n";
    $self->started(1);
  }

  $output .= to_json({
    queue => $self->queue,
    time  => time - $self->offset,
  }, {utf8 => 1});

  $output .= "\n--" . $self->seperator . "\n";
  $output .= " " x ($self->min_bytes - length $output);

  return $output
}

__PACKAGE__->meta->make_immutable;
1;