The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package Helm::Log::Channel::console;
use strict;
use warnings;
use Moose;
use Term::ANSIColor qw(colored color);
use IO::Pipe;
use AnyEvent;
use namespace::autoclean;

extends 'Helm::Log::Channel';
has pipes       => (is => 'ro', writer => '_pipes', isa     => 'HashRef');
has fh          => (is => 'ro', writer => '_fh');
has is_parallel => (is => 'rw', isa    => 'Bool',   default => 0);

my $LINE = '-' x 70;

sub initialize {
    my ($self, $helm) = @_;

    # default FH is STDERR
    Helm->debug("Initializing output handle to STDERR");
    my $fh = IO::Handle->new_from_fd(fileno(STDERR), 'w');
    $self->_fh($fh);
}

sub finalize {
    my ($self, $helm) = @_;
    # make sure the terminal is reset
    print color 'reset';
}

sub start_server {
    my ($self, $server) = @_;
    $self->SUPER::start_server($server);

    if ($self->is_parallel) {
        $self->output("Starting task on $server", 'blue');
    } else {
        $self->output("$LINE\n$server\n$LINE", 'blue');
    }
}

sub end_server {
    my ($self, $server) = @_;
    $self->SUPER::end_server($server);
    if (!$self->is_parallel) {
        $self->fh->print("\n");
    }
}

sub debug {
    my ($self, $msg) = @_;
    $self->output("[DEBUG] $msg", 'bright_blue');
}

sub info {
    my ($self, $msg) = @_;
    $self->output($msg, 'bright_green');
}

sub warn {
    my ($self, $msg) = @_;
    $self->output("[WARN] $msg", 'yellow');
}

sub error {
    my ($self, $msg) = @_;
    $self->output("[ERROR] $msg", 'red');
}

sub output {
    my ($self, $msg, @colors) = @_;
    my $prefix = $self->is_parallel ? '[' . $self->current_server->name . '] ' : '';
    $msg = "$prefix$msg\n";
    $msg = colored($msg, @colors) if @colors;
    $self->fh->print($msg);
}

# fork off an IO worker which has a pipe for each server we're going to do work
# on. Use that pipe to communicate with the process doing the parallel work
# on that server. So when another child process does $helm->log->info... that
# output will end up going from that child process over a pipe reserved for that
# process's server, to this IO worker process which will then send it to the console.
sub parallelize {
    my ($self, $helm) = @_;
    $self->is_parallel(1);

    # if we're going to do parallel stuff, then create a pipe for each server now
    # that we can use to communicate with the child processes later
    Helm->debug("Creating a pipe for each server target for multiplexing output");
    my %pipes = map { $_->name => IO::Pipe->new } (@{$helm->servers});

    # and one for the parent so it's handled like everything else
    Helm->debug("Parent process should also communicate over multiplexing pipes");
    my $parent_pipe = IO::Pipe->new();
    $pipes{parent} = $parent_pipe;

    $self->_pipes(\%pipes);

    # fork off an IO worker process
    my $pid = fork();
    $helm->die("Couldn't fork console IO worker process") if !defined $pid;
    if ($pid) {
        # parent here
        Helm->debug("Parent process pipe is a writer");
        $parent_pipe->writer;
        $parent_pipe->autoflush(1);
        $self->_fh($parent_pipe);
    } else {
        # child here
        my %pipe_cleaners;
        my $all_clean = AnyEvent->condvar;
        Helm->debug("Child process pipes are readers");
        foreach my $server (keys %pipes) {
            my $pipe = $pipes{$server};
            $pipe->reader;

            # create an IO watcher for this pipe
            Helm->debug("Setting up reading event for IO worker pipe for $server");
            $pipe_cleaners{$server} = AnyEvent->io(
                fh   => $pipe,
                poll => 'r',
                cb   => sub {
                    my $msg = <$pipe>;
                    if ($msg) {
                        print STDERR $msg;
                    } else {
                        Helm->debug("Pipe for $server has been disconnected"); 
                        delete $pipe_cleaners{$server};
                        # tell the main program we're done if this is the last pipe
                        $all_clean->send unless %pipe_cleaners;
                    }
                },
            );
        }

        Helm->debug("Waiting on child pipes to receive data");
        $all_clean->recv;
        Helm->debug("All child pipes have been read");
        exit(0);
    }
}

# we've been forked, and if it's a child we want to initialize the pipe
# for this worker child's server
sub forked {
    my ($self, $type) = @_;

    if ($type eq 'child') {
        my $pipes       = $self->pipes;
        my $server_name = $self->current_server->name;
        my $pipe        = $pipes->{$server_name};
        Helm->debug("Console output now goes over writer pipe for $server_name");
        $pipe->writer();
        $pipe->autoflush(1);
        $self->_fh($pipe);
    }
}

__PACKAGE__->meta->make_immutable;

1;