The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk

package IO::Async::JSONStream;

use strict;
use warnings;

our $VERSION = '0.02';

use base qw( IO::Async::Stream );
IO::Async::Stream->VERSION( '0.57' ); # ->read future

use JSON qw( encode_json decode_json );

use Carp;

=head1 NAME

C<IO::Async::JSONStream> - send or receive lines of JSON data in C<IO::Async>

=head1 SYNOPSIS

 use IO::Async::JSONStream;

 use IO::Async::Loop;
 my $loop = IO::Async::Loop->new;

 my $jsonstream = IO::Async::JSONStream->new;
 $loop->add( $jsonstream );

 $jsonstream->connect(
    host    => "my.server",
    service => 12345,
 )->then( sub {
    $jsonstream->write_json( [ data => { goes => "here" } ] );
    $jsonstream->read_json
 })->on_done( sub {
    my ( $data ) = @_;

    print "Received the data $data\n";
 })->get;

=head1 DESCRIPTION

This subclass of L<IO::Async::Stream> implements a simple JSON-encoded data
stream, sending and receiving Perl data structures by JSON-encoded lines of
text.

=cut

=head1 EVENTS

The following events are invoked, either using subclass methods or CODE
references in parameters:

=head2 on_json $data

Invoked when a line of JSON-encoded data is received. It is passed the decoded
data as a regular Perl data structure.

=head2 on_json_error $error

Invoked when a line is received but JSON decoding fails. It is passed the
failure exception from the JSON decoder.

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=over 8

=item on_json => CODE

=item on_json_error => CODE

CODE references for event handlers.

=item eol => STRING

Optional. Sets the string used for the line ending on the stream when writing
JSON. Defaults to C<\n> if not given.

=back

=cut

sub _init
{
   my $self = shift;
   $self->SUPER::_init( @_ );

   $self->{eol} = "\n";
   $self->{json_read_f} = [];
   $self->{json} = JSON->new;
}

sub configure
{
   my $self = shift;
   my %args = @_;

   foreach (qw( on_json on_json_error eol )) {
      $self->{$_} = delete $args{$_} if exists $args{$_};
   }

   if( $self->read_handle ) {
      $self->can_event( $_ ) or croak "Expected either an $_ callback or to be able to ->$_"
         for qw( on_json on_json_error );
   }

   $self->SUPER::configure( %args );
}

sub on_read
{
   my $self = shift;
   my ( $buffref, $eof ) = @_;
   return if $eof;

   my $json = $self->{json};

   $json->incr_parse( $$buffref );
   $$buffref = '';

   PARSE_ONE: {
      my $data;

      my $fail = not eval {
         $data = $json->incr_parse;
         1
      };
      chomp( my $e = $@ );

      my $f;
      1 while $f = shift @{ $self->{json_read_f} } and $f->is_cancelled;

      if( $data ) {
         $f ? $f->done( $data )
            : $self->invoke_event( on_json => $data );
         redo PARSE_ONE;
      }
      elsif( $fail ) {
         $f ? $f->fail( $e, json => )
            : $self->invoke_event( on_json_error => $e );
         $json->incr_skip;
         redo PARSE_ONE;
      }
      # else last
   }

   return 0;
}

=head1 METHODS

=cut

=head2 $jsonstream->write_json( $data, %args )

Writes a new line of JSON-encoded data from the given Perl data structure.

Other arguments are passed to the C<write> method. Returns a C<Future> which
will complete when the line is flushed.

=cut

sub write_json
{
   my $self = shift;
   my ( $data, @args ) = @_;

   $self->write( encode_json( $data ) . $self->{eol}, @args );
}

=head2 $jsonstream->read_json ==> $data

Returns a L<Future> that will yield the next line of JSON-encoded data to be
read from the stream. This takes place instead of the C<on_json> event.

If a JSON decoding error occurs it will result in a failed Future with the
operation name C<json> and the line on which decoding failed as its argument.

=cut

sub read_json
{
   my $self = shift;

   push @{ $self->{json_read_f} }, my $f = $self->loop->new_future;

   return $f;
}

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

Incremental parsing support added by Frew Schmidt

=cut

0x55AA;