The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package AnyEvent::Net::Curl::Queued::Multi;
# ABSTRACT: Net::Curl::Multi wrapped by Moo


use strict;
use utf8;
use warnings qw(all);

use AnyEvent;
use Carp qw(confess);
use Moo;
use MooX::Types::MooseLike::Base qw(
    AnyOf
    ArrayRef
    HashRef
    Int
    Num
    Object
    Ref
);
use Net::Curl::Multi;
use Scalar::Util qw(set_prototype);

# kill Net::Curl::Mulii prototypes as they wreck around/before/after method modifiers
set_prototype \&Net::Curl::Multi::new           => undef;
set_prototype \&Net::Curl::Multi::socket_action => undef;
set_prototype \&Net::Curl::Multi::add_handle    => undef;

extends 'Net::Curl::Multi';


has active      => (is => 'ro', isa => Int, default => sub { -1 }, writer => 'set_active');


has pool        => (is => 'ro', isa => HashRef[Ref], default => sub { {} });


has timer       => (is => 'ro', isa => AnyOf[ArrayRef, Object], writer => 'set_timer', clearer => 'clear_timer', predicate => 'has_timer', weak_ref => 1);


has max         => (is => 'ro', isa => Num, default => sub { 4 });


has timeout     => (is => 'ro', isa => Num, default => sub { 60.0 });

our $VERSION = '0.044'; # VERSION


sub BUILD {
    my ($self) = @_;

    $self->setopt(Net::Curl::Multi::CURLMOPT_MAXCONNECTS        => $self->max << 2);
    $self->setopt(Net::Curl::Multi::CURLMOPT_SOCKETFUNCTION     => \&_cb_socket);
    $self->setopt(Net::Curl::Multi::CURLMOPT_TIMERFUNCTION      => \&_cb_timer);

    return;
}

## no critic (RequireArgUnpacking)
sub BUILDARGS { return $_[-1] }

# socket callback: will be called by curl any time events on some
# socket must be updated
sub _cb_socket {
    my ($self, undef, $socket, $poll) = @_;

    # Right now $socket belongs to that $easy, but it can be
    # shared with another easy handle if server supports persistent
    # connections.
    # This is why we register socket events inside multi object
    # and not $easy.

    # AnyEvent does not support registering a socket for both
    # reading and writing. This is rarely used so there is no
    # harm in separating the events.

    my $keep = 0;

    # register read event
    if ($poll & Net::Curl::Multi::CURL_POLL_IN) {
        $self->pool->{"r$socket"} = AE::io $socket, 0, sub {
            $self->socket_action($socket, Net::Curl::Multi::CURL_CSELECT_IN);
        };
        ++$keep;
    }

    # register write event
    if ($poll & Net::Curl::Multi::CURL_POLL_OUT) {
        $self->pool->{"w$socket"} = AE::io $socket, 1, sub {
            $self->socket_action($socket, Net::Curl::Multi::CURL_CSELECT_OUT);
        };
        ++$keep;
    }

    # deregister old io events
    unless ($keep) {
        delete $self->pool->{"r$socket"};
        delete $self->pool->{"w$socket"};
    }

    return 0;
}

# timer callback: It triggers timeout update. Timeout value tells
# us how soon socket_action must be called if there were no actions
# on sockets. This will allow curl to trigger timeout events.
sub _cb_timer {
    my ($self, $timeout_ms) = @_;

    # deregister old timer
    $self->clear_timer;

    my $cb = sub {
        $self->socket_action(Net::Curl::Multi::CURL_SOCKET_TIMEOUT)
            #if $self->handles > 0;
    };

    if ($timeout_ms < 0) {
        # Negative timeout means there is no timeout at all.
        # Normally happens if there are no handles anymore.
        #
        # However, curl_multi_timeout(3) says:
        #
        # Note: if libcurl returns a -1 timeout here, it just means
        # that libcurl currently has no stored timeout value. You
        # must not wait too long (more than a few seconds perhaps)
        # before you call curl_multi_perform() again.

        $self->set_timer(AE::timer 1, 1, $cb);
    } elsif ($timeout_ms < 10) {
        # Short timeouts are just... Weird!
    } else {
        # This will trigger timeouts if there are any.
        $self->set_timer(AE::timer $timeout_ms / 1000, 0, $cb);
    }

    return 0;
}


around socket_action => sub {
    my $orig = shift;
    my $self = shift;

    my $active = $orig->($self => @_);

    my $i = 0;
    while (my (undef, $easy, $result) = $self->info_read) {
        $self->remove_handle($easy);
        $easy->_finish($result);
    } continue {
        ++$i;
    }

    return $self->set_active($active - $i);
};


around add_handle => sub {
    my $orig = shift;
    my $self = shift;
    my $easy = shift;

    my $r = $orig->($self => $easy);

    # Calling socket_action with default arguments will trigger
    # socket callback and register IO events.
    #
    # It _must_ be called _after_ add_handle(); AE will take care
    # of that.
    #
    # We are delaying the call because in some cases socket_action
    # may finish immediately (i.e. there was some error or we used
    # persistent connections and server returned data right away)
    # and it could confuse our application -- it would appear to
    # have finished before it started.
    AE::postpone {
        $self->socket_action;
    };

    return $r;
};


1;

__END__

=pod

=encoding utf8

=head1 NAME

AnyEvent::Net::Curl::Queued::Multi - Net::Curl::Multi wrapped by Moo

=head1 VERSION

version 0.044

=head1 SYNOPSIS

    use AnyEvent::Net::Curl::Queued::Multi;

    my $multi = AnyEvent::Net::Curl::Queued::Multi->new({
        max     => 10,
        timeout => 30,
    });

=head1 WARNING: GONE MOO!

This module isn't using L<Any::Moose> anymore due to the announced deprecation status of that module.
The switch to the L<Moo> is known to break modules that do C<extend 'AnyEvent::Net::Curl::Queued::Easy'> / C<extend 'YADA::Worker'>!
To keep the compatibility, make sure that you are using L<MooseX::NonMoose>:

    package YourSubclassingModule;
    use Moose;
    use MooseX::NonMoose;
    extends 'AnyEvent::Net::Curl::Queued::Easy';
    ...

Or L<MouseX::NonMoose>:

    package YourSubclassingModule;
    use Mouse;
    use MouseX::NonMoose;
    extends 'AnyEvent::Net::Curl::Queued::Easy';
    ...

Or the L<Any::Moose> equivalent:

    package YourSubclassingModule;
    use Any::Moose;
    use Any::Moose qw(X::NonMoose);
    extends 'AnyEvent::Net::Curl::Queued::Easy';
    ...

However, the recommended approach is to switch your subclassing module to L<Moo> altogether (you can use L<MooX::late> to smoothen the transition):

    package YourSubclassingModule;
    use Moo;
    use MooX::late;
    extends 'AnyEvent::Net::Curl::Queued::Easy';
    ...

=head1 DESCRIPTION

This module extends the L<Net::Curl::Multi> class through L<Moo> and adds L<AnyEvent> handlers.

=head1 ATTRIBUTES

=head2 active

Currently active sockets.

=head2 pool

Sockets pool.

=head2 timer

L<AnyEvent> C<timer()> handler.

=head2 max

Maximum parallel connections limit (default: 4).

=head2 timeout

Timeout threshold, in seconds (default: 10).

=head1 METHODS

=head2 socket_action(...)

Wrapper around the C<socket_action()> from L<Net::Curl::Multi>.

=head2 add_handle(...)

Overrides the C<add_handle()> from L<Net::Curl::Multi>.
Add one handle and kickstart download.

=for Pod::Coverage BUILD
BUILDARGS

=head1 SEE ALSO

=over 4

=item *

L<AnyEvent>

=item *

L<AnyEvent::Net::Curl::Queued>

=item *

L<Moo>

=item *

L<Net::Curl::Multi>

=back

=head1 AUTHOR

Stanislaw Pusep <stas@sysd.org>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Stanislaw Pusep.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut