The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Data::Monad::CondVar;
use strict;
use warnings;
use 5.012;
use AnyEvent;
use Scalar::Util;
use Exporter qw/import/;

our $VERSION = "0.04";
our @EXPORT = qw/as_cv cv_unit cv_zero cv_fail cv_flat_map_multi cv_map_multi
                 cv_sequence call_cc/;

sub _assert_cv($) {
    $_[0]->ready and die "[BUG]It already has been ready";
    $_[0];
}

sub as_cv(&) {
    my $code = shift;
    $code->(my $cv = AE::cv);
    $cv;
}

sub cv_unit { AnyEvent::CondVar->unit(@_) }
sub cv_zero { AnyEvent::CondVar->zero(@_) }
sub cv_fail { AnyEvent::CondVar->fail(@_) }
sub cv_flat_map_multi(&@) { AnyEvent::CondVar->flat_map_multi(@_) }
sub cv_map_multi(&@) { AnyEvent::CondVar->map_multi(@_) }
sub cv_sequence { AnyEvent::CondVar->sequence(@_) }

sub call_cc(&) {
    my $f = shift;
    my $ret_cv = AE::cv;

    my $skip = sub {
        my @v = @_;
        _assert_cv $ret_cv;
        $ret_cv->send(@v);

        return AE::cv; # nop
    };

    my $branch_cv = $f->($skip)->map(sub {
        _assert_cv $ret_cv;
        $ret_cv->send(@_);
    })->catch(sub {
        _assert_cv $ret_cv;
        $ret_cv->croak(@_);
        cv_unit; # void
    })->catch(sub { warn @_ });
    $ret_cv->canceler(sub { $branch_cv->cancel });

    return $ret_cv;
}


package Data::Monad::CondVar::Mixin;
use strict;
use warnings;
use Carp ();
use Scalar::Util ();
use AnyEvent ();

# extends AE::cv directly
require Data::Monad::Base::MonadZero;
for my $mixin (__PACKAGE__, 'Data::Monad::Base::MonadZero') {
    next if grep { $_ eq $mixin } @AnyEvent::CondVar::ISA;
    push @AnyEvent::CondVar::ISA, $mixin;
}

our $ZERO = "[ZERO of ${\ __PACKAGE__}]";

*_assert_cv = \&Data::Monad::CondVar::_assert_cv;

sub _cv_or_die($) {
    Scalar::Util::blessed $_[0] && $_[0]->isa('AnyEvent::CondVar')
        or Carp::croak "You must use CondVar object here.";
}

sub unit {
    my $class = shift;
    (my $cv = AE::cv)->send(@_);
    return $cv;
}

sub fail {
    my $class = shift;

    # XXX cv's croak doesn't throw the error if the message is empty.
    my $msg = $_[0] || $ZERO;
    (my $cv = AE::cv)->croak($msg);

    return $cv;
}

sub zero { $_[0]->fail($ZERO) }

sub any {
    my ($class, @cvs) = @_;

    my $result_cv = AE::cv;
    my @any_cv;
    for (@cvs) {
        push @any_cv, $_->map(sub {
            return unless $result_cv;
            _assert_cv $result_cv;
            $result_cv->send(@_);
            $result_cv->cancel;
        })->catch(sub {
            return unless $result_cv;
            _assert_cv $result_cv;
            $result_cv->croak(@_);
            $result_cv->cancel;
            return $class->unit;
        })->catch(sub { warn @_ });
    }
    $result_cv->canceler(sub { $_->cancel for @any_cv });

    $result_cv;
}

sub all {
    my ($class, @cvs) = @_;

    my @result;
    (my $result_cv = AE::cv)->begin(sub {
        my ($cv) = @_;
        _assert_cv $cv;
        $cv->send(@result);
    });
    for my $i (0 .. $#cvs) {
        $result_cv->begin;

        $cvs[$i]->map(sub {
            $result[$i] = [@_];
            $result_cv->end;
        })->catch(sub {
            _assert_cv $result_cv;
            $result_cv->croak(@_);
            $result_cv->cancel;
            return $class->unit;
        })->catch(sub { warn @_ });
    }
    $result_cv->end;

    $result_cv->canceler(sub { $_->cancel for @cvs });

    $result_cv;
}

sub cancel { (delete $_[0]->{_monad_canceler} || sub {})->() }

sub canceler {
    my $cv = shift;
    @_ and $cv->{_monad_canceler} = shift;
    $cv->{_monad_canceler};
}

sub flat_map {
    my ($self, $f) = @_;

    my $cv_bound = AE::cv;
    my $cv_current = $self;
    $self->cb(sub {
        my $cv = $cv_current = eval {
            my ($cv) = $f->($_[0]->recv);
            _cv_or_die $cv;
            $cv;
        };

        if ($@) {
            _assert_cv $cv_bound;
            return $cv_bound->croak($@);
        }
        $cv->cb(sub {
            my @v = eval { $_[0]->recv };
            _assert_cv $cv_bound;
            $@ ? $cv_bound->croak($@) : $cv_bound->send(@v);
        });
    });
    $cv_bound->canceler(sub {
        $cv_current->cancel;
        $cv_current->cb(undef); # release the callback function
    });

    return $cv_bound;
}

sub or {
    my ($self, $alter) = @_;

    my $cv_mixed = AE::cv;
    $self->cb(sub {
        my @v = eval { $_[0]->recv };
        unless ($@) {
            $cv_mixed->(@v);
        } elsif ($@ =~ /\Q$ZERO\E/) {
            $alter->cb(sub {
                my @v = eval { $_[0]->recv };
                _assert_cv $cv_mixed;
                $@ ? $cv_mixed->croak($@) : $cv_mixed->(@v);
            });
        } else {
            _assert_cv $cv_mixed;
            $cv_mixed->croak($@);
        }
    });
    $cv_mixed->canceler(sub { $_->cancel for $self, $alter });

    $cv_mixed;
}

sub catch {
    my ($self, $f) = @_;

    my $result_cv = AE::cv;
    my $active_cv = $self;
    $self->cb(sub {
        my @v = eval { $_[0]->recv };
        my $exception = $@ or return $result_cv->(@v);

        my $cv = $active_cv = eval {
            my $cv = $f->($exception);
            _cv_or_die $cv;
            $cv;
        };
        $@ and return (_assert_cv $result_cv)->croak($@);

        $cv->cb(sub {
            my @v = eval { $_[0]->recv };
            _assert_cv $result_cv;
            $@ ? $result_cv->croak($@) : $result_cv->send(@v);
        });
    });
    $result_cv->canceler(sub {
        $active_cv->cancel;
        $active_cv->cb(undef); # release the callback function
    });

    return $result_cv;
}

sub sleep {
    my ($self, $sec) = @_;
    $self->flat_map(sub {
        my @v = @_;
        my $cv = AE::cv;
        my $t; $t = AE::timer $sec, 0, sub { $cv->(@v); undef $cv };
        $cv->canceler(sub { undef $t });
        return $cv;
    });
}

sub timeout {
    my ($self, $sec) = @_;
    my $class = ref $self;

    $class->any($class->unit->sleep($sec), $self);
}

sub retry {
    my $self = shift;
    my $f = pop;
    my ($retry, $pace) = @_;
    $retry ||= 1;
    $pace ||= 0;
    my $class = ref $self;

    $self->flat_map(sub {
        my @args = @_;

        $class->unit($retry, undef, undef)->while(
            sub {
                my ($retry, $ok, $fail) = @_;
                ! $ok and $retry > 0;
            }, sub {
                my ($retry, $ok, $fail) = @_;
                $f->(@args)->flat_map(sub {
                    $class->unit($retry - 1, [@_], undef);
                })->catch(sub {
                    $class->unit($retry - 1, undef, [@_])
                          ->sleep($pace);
                });
            }
        )->flat_map(sub {
            my ($retry, $ok, $fail) = @_;
            $fail ? $class->fail(@$fail) : $class->unit(@$ok);
        });
    });
}

1;

__END__

=head1 NAME

Data::Monad::CondVar - The CondVar monad.

=head1 SYNOPSIS

  use Data::Monad::CondVar;

  # The sleep sort
  my @list = (3, 5, 2, 4, 9, 1, 8);
  my @result;
  AnyEvent::CondVar->all(
      map {
          cv_unit($_)->sleep($_ / 1000)
                     ->map(sub { push @result, @_ });
      } @list
  )->recv;

=head1 DESCRIPTION

Data::Monad::CondVar adds monadic operations to AnyEvent::CondVar.

Since this module extends AnyEvent::CondVar directly, you can call monadic
methods anywhere there are CondVars.

This module is marked B<EXPERIMENTAL>. API could be changed without any notice.

=head1 METHODS

=over 4

=item $cv = as_cv($cb->($cv))

A helper for rewriting functions using callbacks to ones returning CVs.

  my $cv = as_cv { http_get "http://google.ne.jp", $_[0] };
  my ($data, $headers) = $cv->recv;

=item $cv = cv_unit(@vs)

=item $cv = cv_zero()

=item $cv = cv_fail($v)

=item $f = cv_flat_map_multi(\&f, $cv1, $cv2, ...)

=item $f = cv_map_multi(\&f, $cv1, $cv2, ...)

=item $cv = cv_sequence($cv1, $cv2, ...)

These are shorthand of methods which has the same name.

=item $cv = call_cc($f->($cont))

Calls C<$f> with current continuation, C<$cont>.
C<$f> must return a CondVar object.
If you call C<$cont> in C<$f>, results are sent to C<$cv> directly and
codes left in C<$f> will be skipped.

You can use C<call_cc> to escape a deeply nested call structure.

  sub myname {
      my $uc = shift;

      return call_cc {
          my $cont = shift;

          cv_unit("hiratara")->flat_map(sub {
              return $cont->(@_) unless $uc; # escape from an inner block
              cv_unit @_;
          })->map(sub { uc $_[0] });
      };
  }

  print myname(0)->recv, "\n"; # hiratara
  print myname(1)->recv, "\n"; # HIRATARA

=item unit

=item flat_map

Overrides methods of L<Data::Monad::Base::Monad>.

=item zero

Overrides methods of L<Data::Monad::Base::MonadZero>.
It uses C<fail> method internally.

=item $cv = AnyEvent::CondVar->fail($msg)

Creates the new CondVar object which represents a failed operation.
You can use C<catch> to handle failed operations.

=item $cv = AnyEvent::CondVar->any($cv1, $cv2, ...)

Takes the earliest value from C<$cv1>, C<$cv2>, ...

=item $cv = AnyEvent::CondVar->all($cv1, $cv2, ...)

Takes all values from C<$cv1>, C<$cv2>, ...

This method works completely like C<<Data::Monad::Base::Monad->sequence>>,
but you may want use this method for better cancellation.

=item $cv->cancel

Cancels computations for this CV. This method just calls the call back
which is set in the C<canceler> field.

C<<$cv->recv>> may never return from blocking after you call C<cancel>.

=item $cv->canceler($cb->())

=item $code = $cv->canceler

The accessor of the method to cancel. You should set this field appropriately
when you create the new CondVar object.

  my $cv = AE::cv;
  my $t = AE::timer $sec, 0, sub {
      $cv->send(@any_results);
      $cv->canceler(undef); # Destroy cyclic refs
  };
  $cv->canceler(sub { undef $t });

=item $cv = $cv1->or($cv2)

If C<$cv1> croaks, C<or> returns the CondVar object which contains values of
C<$cv2>. Otherwise it returns C<$cv1>'s values.

C<or> would be C<mplus> on Haskell.

=item $cv = $cv1->catch($cb->($@))

If C<$cv1> croaks, C<$cb> is called and it returns the new CondVar object
containing its result. Otherwise C<catch> does nothing. C<$cb> must return
a CondVar object.

You can use this method to handle errors.

  cv_unit(1, 0)
  ->map(sub { $_[0] / $_[1] })
  ->catch(sub {
      my $exception = shift;
      $exception =~ /Illegal division/
          ? cv_unit(0)           # recover from errors
          : cv_fail($exception); # rethrow
  });

=item $cv = $cv1->sleep($sec)

Sleeps C<$sec> seconds, and just sends values of C<$cv1> to C<$cv>.

=item $cv = $cv1->timeout($sec)

If C<$cv1> doesn't compute any values within C<$sec> seconds,
C<$cv> will be received C<undef> and C<$cv1> will be canceled.

Otherwise C<$cv> will be received C<$cv1>'s results.

=item $cv = $cv1->retry($max, [$pace, ], $f->(@v))

Continue to call C<flat_map($f)> until C<$f> returns a normal value which
doesn't croak.

C<$max> is maximum number of retries, C<$pace> is how long it sleeps between
each retry. The default value of C<$pace> is C<0>.

=back

=head1 AUTHOR

hiratara E<lt>hiratara {at} cpan.orgE<gt>

=head1 SEE ALSO

L<Data::Monad::Base::Monad>

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut