The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Canella::TaskRunner;
use Moo;
use Coro;
use Coro::Select;
use Canella::Log;
use Guard;

sub execute {
    my ($self, $ctx, %args) = @_;

    my $role = $args{role};
    my $tasks = $args{tasks};

    my $hosts = $role->get_hosts();
    my $concurrency = $ctx->concurrency > 0 ?
        $ctx->concurrency : scalar @$hosts;

    # Register per-role parameters, only if they are not set
    my %has_key = map { ($_ => 1) } $ctx->parameters->keys;
    my %tmp_key;
    foreach my $param_key ($role->parameters->keys) {
        if ($has_key{$param_key}) {
            next;
        }
        debugf("Applying role default parameter %s", $param_key);
        $tmp_key{$param_key} = $ctx->parameters->get($param_key);
        $ctx->parameters->set($param_key, $role->parameters->get($param_key));
    }
    my $block_ctx = guard {
        my $params = $ctx->parameters;
        foreach my $param_key (keys %tmp_key) {
            debugf("Restoring paramter value for %s", $param_key);
            $params->set($param_key, $tmp_key{$param_key});
        }
    };

    my @coros;
    foreach my $host (@$hosts) {
        push @coros, async {
            my ($ctx, $host, $tasks) = @_;
            $ctx->stash(current_host => $host);
            foreach my $task (@$tasks) {
                $ctx->call_task($task);
            }
        } $ctx, $host, $tasks;

        if (@coros >= $concurrency) {
            $_->join for @coros;
            @coros = ();
        }
    }

    $_->join for @coros;
    infof "All done!";
}

1;