The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use utf8;
use strict;
use warnings;

package DR::Tarantool::AEConnection;
use AnyEvent;
use AnyEvent::Socket ();
use Carp;
use List::MoreUtils ();
use Scalar::Util ();

sub _errno() {
    while (my ($k, $v) = each(%!)) {
        return $k if $v;
    }
    return $!;
}

sub new {
    my ($class, %opts) = @_;

    $opts{state} = 'init';
    $opts{host}  ||= '127.0.0.1';
    croak 'port is undefined' unless $opts{port};


    $opts{on}{connected}    ||= sub {  };
    $opts{on}{connfail}     ||= sub {  };
    $opts{on}{disconnect}   ||= sub {  };
    $opts{on}{error}        ||= sub {  };
    $opts{on}{reconnecting} ||= sub {  };

    $opts{success_connects} = 0;
    $opts{wbuf} = '';

    $opts{read} = { any => [] };

    bless \%opts => ref($class) || $class;
}


sub on {
    my ($self, $name, $cb) = @_;
    croak "wrong event name: $name" unless exists $self->{on}{$name};
    $self->{on}{$name} = $cb || sub {  };
    $self;
}

sub fh      { $_[0]->{fh} }
sub state   { $_[0]->{state} }
sub host    { $_[0]->{host} }
sub port    { $_[0]->{port} }
sub error   { $_[0]->{error} }
sub errno   { $_[0]->{errno} }
sub reconnect_always    { $_[0]->{reconnect_always} }
sub reconnect_period    { $_[0]->{reconnect_period} }
sub timeout {
    my ($self) = @_;
    return $self->{timeout} if @_ == 1;
    return $self->{timeout} = $_[1];
}


sub set_error {
    my ($self, $error, $errno) = @_;
    $errno ||= $error;
    $self->{state} = 'error';
    $self->{error} = $error;
    $self->{errno} = $errno;
    $self->{on}{error}($self);
    $self->{guard} = {};
    $self->{wbuf} = '';

    $self->_check_reconnect;
    
}

sub _check_reconnect {
    Scalar::Util::weaken(my $self = shift);
    return if $self->state eq 'connected';
    return if $self->state eq 'connecting';
    return if $self->{guard}{rc};

    return unless $self->reconnect_period;
    unless ($self->reconnect_always) {
        return unless $self->{success_connects};
    }

    $self->{guard}{rc} = AE::timer $self->reconnect_period, 0, sub {
        return unless $self;
        delete $self->{guard}{rc};
        $self->{on}{reconnecting}($self);
        $self->connect;
    };
}

sub connect {
    Scalar::Util::weaken(my $self = shift);

    return if $self->state eq 'connected' or $self->state eq 'connecting';

    $self->{state} = 'connecting';
    $self->{error} = undef;
    $self->{errno} = undef;
    $self->{guard} = {};

    $self->{guard}{c} = AnyEvent::Socket::tcp_connect
        $self->host,
        $self->port,
        sub {
            $self->{guard} = {};
            my ($fh) = @_;
            if ($fh) {
                $self->{fh} = $fh;
                $self->{state} = 'connected';
                $self->{success_connects}++;
                $self->push_write('') if length $self->{wbuf};
                $self->{on}{connected}($self);
                return;
            }
    
            $self->{error} = $!;
            $self->{errno} = _errno;
            $self->{state} = 'connfail';
            $self->{guard} = {};
            $self->{on}{connfail}($self);
            return unless $self;
            $self->_check_reconnect;
        },
        sub {

        }
    ;

    if (defined $self->timeout) {
        $self->{guard}{t} = AE::timer $self->timeout, 0, sub {
            delete $self->{guard}{t};
            return unless $self->state eq 'connecting';

            $self->{error} = 'Connection timeout';
            $self->{errno} = 'ETIMEOUT';
            $self->{state} = 'connfail';
            $self->{guard} = {};
            $self->{on}{connfail}($self);
            $self->_check_reconnect;
        };
    }
   
    $self;
}

sub disconnect {
    Scalar::Util::weaken(my $self = shift);
    return if $self->state eq 'disconnect' or $self->state eq 'init';

    $self->{guard} = {};
    $self->{error} = 'Disconnected';
    $self->{errno} = 'SUCCESS';
    $self->{state} = 'disconnect';
    $self->{wbuf} = '';
    $self->{on}{disconnect}($self);
}


sub push_write {
    Scalar::Util::weaken(my $self = shift);
    my ($str) = @_;

    $self->{wbuf} .= $str;

    return unless $self->state eq 'connected';
    return unless length $self->{wbuf};
    return if $self->{guard}{write};

    $self->{guard}{write} = AE::io $self->fh, 1, sub {
        my $l = syswrite $self->fh, $self->{wbuf};
        unless(defined $l) {
            return if $!{EINTR};
            $self->set_error($!, _errno);
            return;
        }
        substr $self->{wbuf}, 0, $l, '';
        return if length $self->{wbuf};
        delete $self->{guard}{write};
    };
}




1;