The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Object::Remote::ConnectionServer;

use Scalar::Util qw(blessed weaken);
use Module::Runtime qw(use_module);
use Object::Remote;
use Object::Remote::Logging qw( :log :dlog );
use Future;
use IO::Socket::UNIX;
use Moo;

has listen_on => (
  is => 'ro',
  coerce => sub {
    return $_[0] if blessed($_[0]);
    unlink($_[0]);
    IO::Socket::UNIX->new(
      Local => $_[0],
      Listen => 1
    ) or die "Couldn't liten to $_[0]: $!";
  },
  trigger => sub {
    my ($self, $fh) = @_;
    log_debug { "adding connection server to run loop because the trigger has executed" };
    weaken($self);
    Object::Remote->current_loop
                  ->watch_io(
                      handle => $fh,
                      on_read_ready => sub { $self->_listen_ready($fh) }
                    );
  },
);

has connection_args => (
 is => 'ro', default => sub { [] }
);

has connection_callback => (
  is => 'ro', default => sub { sub { shift } }
);

sub BUILD {
  log_debug { "A connection server has been built; calling want_run on run loop" };
  Object::Remote->current_loop->want_run;
}

sub run {
  log_debug { "Connection server is calling run_while_wanted on the run loop" };
  Object::Remote->current_loop->run_while_wanted;
}

sub _listen_ready {
  my ($self, $fh) = @_;
  log_debug { "Got a connection, calling accept on the file handle" };
  my $new = $fh->accept or die "Couldn't accept: $!";
  log_trace { "Setting file handle non-blocking" };
  $new->blocking(0);
  my $f = Future->new;
  log_trace { "Creating a new connection with the remote node" };
  my $c = use_module('Object::Remote::Connection')->new(
    receive_from_fh => $new,
    send_to_fh => $new,
    on_close => $f, # and so will die $c
    @{$self->connection_args}
  )->${\$self->connection_callback};
  $f->on_ready(sub { undef($c) });
  log_trace { "marking the future as done" };
  $c->ready_future->done;
  Dlog_trace { "Sending 'Shere' to socket $_" } $new;
  print $new "Shere\n" or die "Couldn't send to new socket: $!";
  log_debug { "Connection has been fully handled" };
  return $c;
}

sub DEMOLISH {
  my ($self, $gd) = @_;
  log_debug { "A connection server is being destroyed; global destruction: '$gd'" };
  return if $gd;
  log_trace { "Removing the connection server IO watcher from run loop" };
  Object::Remote->current_loop
                ->unwatch_io(
                    handle => $self->listen_on,
                    on_read_ready => 1
                  );
  if ($self->listen_on->can('hostpath')) {
    log_debug { my $p = $self->listen_on->hostpath; "Removing '$p' from the filesystem" };
    unlink($self->listen_on->hostpath);
  }
  log_trace { "calling want_stop on the run loop" };
  Object::Remote->current_loop->want_stop;
}

1;