The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk

package IO::Async::JSONStream;

use strict;
use warnings;

our $VERSION = '0.01';

use base qw( IO::Async::Stream );
IO::Async::Stream->VERSION( '0.57' ); # ->read future

use JSON qw( encode_json decode_json );

use Carp;

=head1 NAME

C<IO::Async::JSONStream> - send or receive lines of JSON data in C<IO::Async>

=head1 SYNOPSIS

 use IO::Async::JSONStream;

 use IO::Async::Loop;
 my $loop = IO::Async::Loop->new;

 my $jsonstream = IO::Async::JSONStream->new;
 $loop->add( $jsonstream );

 $jsonstream->connect(
    host    => "my.server",
    service => 12345,
 )->then( sub {
    $jsonstream->write_json( [ data => { goes => "here" } ] );
    $jsonstream->read_json
 })->on_done( sub {
    my ( $data ) = @_;

    print "Received the data $data\n";
 })->get;

=head1 DESCRIPTION

This subclass of L<IO::Async::Stream> implements a simple JSON-encoded data
stream, sending and receiving Perl data structures by JSON-encoded lines of
text.

=cut

=head1 EVENTS

The following events are invoked, either using subclass methods or CODE
references in parameters:

=head2 on_json $data

Invoked when a line of JSON-encoded data is received. It is passed the decoded
data as a regular Perl data structure.

=head2 on_json_error $error, $line

Invoked when a line is received but JSON decoding fails. It is passed the
failure exception from the JSON decoder and the line on which decoding failed.

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=over 8

=item on_json => CODE

=item on_json_error => CODE

CODE references for event handlers.

=item eol => STRING

Optional. Sets the string used for the line ending on the stream. Defaults to
C<\n> if not given.

=back

=cut

sub _init
{
   my $self = shift;
   $self->SUPER::_init( @_ );

   $self->{eol} = "\n";
}

sub configure
{
   my $self = shift;
   my %args = @_;

   foreach (qw( on_json on_json_error eol )) {
      $self->{$_} = delete $args{$_} if exists $args{$_};
   }

   if( $self->read_handle ) {
      $self->can_event( $_ ) or croak "Expected either an $_ callback or to be able to ->$_"
         for qw( on_json on_json_error );
   }

   $self->SUPER::configure( %args );
}

sub on_read
{
   my $self = shift;
   my ( $buffref, $eof ) = @_;
   return if $eof;

   my $eol = $self->{eol};

   while( $$buffref =~ s/^(.*)\Q$eol// ) {
      my $line = $1;
      my $data;
      if( eval { $data = decode_json( $line ); 1 } ) {
         $self->invoke_event( on_json => $data );
      }
      else {
         chomp( my $e = $@ );
         $self->invoke_event( on_json_error => $e, $line );
      }
   }

   return 0;
}

=head1 METHODS

=cut

=head2 $jsonstream->write_json( $data, %args )

Writes a new line of JSON-encoded data from the given Perl data structure.

Other arguments are passed to the C<write> method. Returns a C<Future> which
will complete when the line is flushed.

=cut

sub write_json
{
   my $self = shift;
   my ( $data, @args ) = @_;

   $self->write( encode_json( $data ) . $self->{eol}, @args );
}

=head2 $jsonstream->read_json ==> $data

Returns a L<Future> that will yield the next line of JSON-encoded data to be
read from the stream. This takes place instead of the C<on_json> event.

If a JSON decoding error occurs it will result in a failed Future with the
operation name C<json> and the line on which decoding failed as its argument.

=cut

sub read_json
{
   my $self = shift;
   $self->read_until( $self->{eol} )->then( sub {
      my ( $line ) = @_;
      my $data;
      if( eval { $data = decode_json( $line ); 1 } ) {
         return Future->new->done( $data );
      }
      else {
         chomp( my $e = $@ );
         return Future->new->fail( $e, json => $line );
      }
   });
}

=head1 TODO

=over 2

=item *

Consider a true streaming mode, using C<JSON>'s incremental parsing ability.

=back

=cut

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;